Tutorial Creating a Task - dm03514/func-y-task-engine GitHub Wiki

Creating a task

In this tutorial, we will create, develop and execute a new task.

The task we create will be used as a functional test for func-y task engine itself. Creating a task consists of:

  • Installing Func-y
  • Creating a yaml file to house our task
  • Start the services the task depends on
  • Defining metadata associated with the task
  • Defining the states of our task
  • Incrementally developing task
  • Debugging Task
  • Running tasks as a unit test

Installing Func-y

This tutorial will walk through creating a test using the docker func-y. But the same instructions apply for installing from source.

Using docker

  • docker pull dm03514/funcytaskengine
  • this tutorial uses docker host networking as as shortcut to access local machines resources

From source

Func-y is written in python using gevent. Unfortunately there aren't enough abstractions to insulate you from it yet. To get started install all the python requirements using:

  • pip install -r requirements.txt
  • python setup.py install

Creating a Yaml file

For this tutorial, we are creating a func-y task to test func-y task engine itself. Func-y tasks can live anywhere in your project but we'll be putting ours in:

./tests/funcy/ We are going to be cross-cutting a number of features:

  • UUID preprocessing
  • nsq plugin
  • postgres plugin

Because of this we'll name our test uuid-nsq-postgres.yml

Start Services

Our task will be interacting with nsq and postgres through their public interfaces. docker-compose is used to manage test dependencies. To bring up local services to test against, navigate to the func-y-task-engine root directory and run:

$ make start-functional-stack

docker-compose down
Stopping nsqadmin ... done
Stopping nsqlookupd ... done
Stopping nsqd ... done
Stopping postgres ... done
Removing nsqadmin ... done
Removing nsqlookupd ... done
Removing nsqd ... done
Removing postgres ... done
Removing network funcytaskengine_default
docker-compose up -d
Creating network "funcytaskengine_default" with the default driver
Creating nsqlookupd
Creating nsqadmin
Creating postgres
Creating nsqd
./bin/wait-for-it.sh localhost:5432
wait-for-it.sh: waiting 15 seconds for localhost:5432
wait-for-it.sh: localhost:5432 is available after 0 seconds

This will start nsqd and postgres, and apply postgres table fixtures.

Define the task metadata

Metadata is stored as top level attributes in the task definition. For our task we'll specify a max_timeout for this task. A version (not used) and a task name:

---
max_timeout: 10
name: UUID_nsq_postgres_assertions
version: "1"
events:

When max_timeout is reached the task engine will stop any running states and return a failure to the user.

Defining task states/ Incremental Development

Next we'll have to define each state of our task. Since we are testing nsq and postgres we'll:

  • Create a nsq test topic if it doesn't exist

Each item in the events yaml represents a single state in a linear state progression. The first state will always be executed and complete, before the second state; the second state will come before the third, etc.

---
max_timeout: 10
name: UUID_nsq_postgres_assertions
version: "1"
events:
  - name: create_nsq_topic
    initiator:
      method: post
      type: http.HTTPInitiator
      url: "http://localhost:4151/topic/create?topic=funcy_task"
    transition_conditions:
      - type: assertions.Equal
        value_property: status_code
        to_equal: 200

Incrementally validating each state during testing may help keep your full head of hair in tact. func-y task engine provides a command line utility for executing a single test or a collection of tests, and reporting on the test execution.

$ docker run --network=host -v `pwd`/tests/funcy:/tmp/funcy -it funcytaskengine funcy-task-engine.py run -t /tmp/funcy/uuid-nsq-postgres.yml
{'max_timeout': 10, 'version': '1', 'name': 'UUID_nsq_postgres_assertions', 'events': [{'initiator': {'url': 'http://localhost:4151/topic/create?topic=funcy_task', 'type': 'http.HTTPInitiator', 'method': 'post'}, 'name': 'create_nsq_topic', 'transition_conditions': [{'to_equal': 200, 'value_property': 'status_code', 'type': 'assertions.Equal'}]}]}
2017-11-26 23:24:21,143 - funcytaskengine.engine - DEBUG - {'message': 'sending_first_state', 'first_state': 'create_nsq_topic'}
2017-11-26 23:24:21,143 - funcytaskengine.engine - DEBUG - {'message': 'state_change_requested'}
2017-11-26 23:24:21,143 - transitions.core - DEBUG - Initiating transition from state pending to state create_nsq_topic...
2017-11-26 23:24:21,143 - transitions.core - DEBUG - Exiting state pending. Processing callbacks...
2017-11-26 23:24:21,144 - transitions.core - INFO - Exited state pending
2017-11-26 23:24:21,144 - transitions.core - DEBUG - Entering state create_nsq_topic. Processing callbacks...
2017-11-26 23:24:21,144 - transitions.core - INFO - Entered state create_nsq_topic
2017-11-26 23:24:21,149 - funcytaskengine.event_fulfillment.noop - DEBUG - {'initiator': <funcytaskengine.initiators.http.HTTPInitiator object at 0x7fc0e5ab9990>, 'conditions': <funcytaskengine.transition_conditions.TransitionConditions object at 0x7fc0e46c0590>}
2017-11-26 23:24:21,151 - requests.packages.urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost
2017-11-26 23:24:21,162 - requests.packages.urllib3.connectionpool - DEBUG - http://localhost:4151 "POST /topic/create?topic=funcy_task HTTP/1.1" 200 0
2017-11-26 23:24:21,163 - funcytaskengine.engine - DEBUG - {'message': 'state_change_requested'}
2017-11-26 23:24:21,164 - transitions.core - DEBUG - Initiating transition from state create_nsq_topic to state finished...
2017-11-26 23:24:21,164 - transitions.core - DEBUG - Exiting state create_nsq_topic. Processing callbacks...
2017-11-26 23:24:21,164 - transitions.core - INFO - Exited state create_nsq_topic
2017-11-26 23:24:21,164 - transitions.core - DEBUG - Entering state finished. Processing callbacks...
2017-11-26 23:24:21,164 - transitions.core - INFO - Entered state finished
2017-11-26 23:24:21,164 - funcytaskengine.engine - DEBUG - {'status': 'SUCCESS', 'message': 'task_execution_finished'}

Above shows the output of our very first test execution!!! Above lists all the state transitions that the task went through. In the middle we can see we made a POST request to our local nsqd over its http port, and it resulted in a 200!

Updating the status code to equal 444 will display what a failure looks like.

    transition_conditions:
      - type: assertions.Equal
        value_property: status_code
        to_equal: 444
$ docker run --network=host -v `pwd`/tests/funcy:/tmp/funcy -it funcytaskengine funcy-task-engine.py run -t /tmp/funcy/uuid-nsq-postgres.yml
{'max_timeout': 10, 'version': '1', 'name': 'UUID_nsq_postgres_assertions', 'events': [{'initiator': {'url': 'http://localhost:4151/topic/create?topic=funcy_task', 'type': 'http.HTTPInitiator', 'method': 'post'}, 'name': 'create_nsq_topic', 'transition_conditions': [{'to_equal': 444, 'value_property': 'status_code', 'type': 'assertions.Equal'}]}]}
2017-11-26 23:26:36,362 - funcytaskengine.engine - DEBUG - {'message': 'sending_first_state', 'first_state': 'create_nsq_topic'}
2017-11-26 23:26:36,362 - funcytaskengine.engine - DEBUG - {'message': 'state_change_requested'}
2017-11-26 23:26:36,362 - transitions.core - DEBUG - Initiating transition from state pending to state create_nsq_topic...
2017-11-26 23:26:36,362 - transitions.core - DEBUG - Exiting state pending. Processing callbacks...
2017-11-26 23:26:36,362 - transitions.core - INFO - Exited state pending
2017-11-26 23:26:36,363 - transitions.core - DEBUG - Entering state create_nsq_topic. Processing callbacks...
2017-11-26 23:26:36,363 - transitions.core - INFO - Entered state create_nsq_topic
2017-11-26 23:26:36,363 - funcytaskengine.event_fulfillment.noop - DEBUG - {'initiator': <funcytaskengine.initiators.http.HTTPInitiator object at 0x7f03ee913990>, 'conditions': <funcytaskengine.transition_conditions.TransitionConditions object at 0x7f03ed51a590>}
2017-11-26 23:26:36,366 - requests.packages.urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost
2017-11-26 23:26:36,379 - requests.packages.urllib3.connectionpool - DEBUG - http://localhost:4151 "POST /topic/create?topic=funcy_task HTTP/1.1" 200 0
2017-11-26 23:26:36,384 - funcytaskengine.machine - ERROR - {'event_name': 'create_nsq_topic', 'message': 'event_execution_error', 'exception': AssertionError('444 != 200',), 'stack': 'Traceback (most recent call last):\n  File "build/bdist.linux-x86_64/egg/funcytaskengine/machine.py", line 92, in run\n    result = event.execute(event_results=self.event_results)\n  File "build/bdist.linux-x86_64/egg/funcytaskengine/machine.py", line 52, in execute\n    result = self.fulfillment.run(self.initiator, self.conditions, **kwargs)\n  File "build/bdist.linux-x86_64/egg/funcytaskengine/transition_conditions/__init__.py", line 19, in decorated\n    conditions.apply()\n  File "build/bdist.linux-x86_64/egg/funcytaskengine/transition_conditions/__init__.py", line 58, in apply\n    self.vs = con.apply(self.vs)\n  File "build/bdist.linux-x86_64/egg/funcytaskengine/transition_conditions/assertions.py", line 86, in apply\n    assert self.to_equal == to_assert, \'{} != {}\'.format(self.to_equal, to_assert)\nAssertionError: 444 != 200\n'}
2017-11-26 23:26:36,384 - funcytaskengine.engine - DEBUG - {'message': 'task_failure'}
Traceback (most recent call last):
  File "/usr/local/bin/funcy-task-engine.py", line 4, in <module>
    __import__('pkg_resources').run_script('funcy-task-engine==0.0.1.dev0', 'funcy-task-engine.py')
  File "/usr/local/lib/python2.7/site-packages/pkg_resources/__init__.py", line 748, in run_script
    self.require(requires)[0].run_script(script_name, ns)
  File "/usr/local/lib/python2.7/site-packages/pkg_resources/__init__.py", line 1524, in run_script
    exec(script_code, namespace, namespace)
  File "/usr/local/lib/python2.7/site-packages/funcy_task_engine-0.0.1.dev0-py2.7.egg/EGG-INFO/scripts/funcy-task-engine.py", line 49, in <module>

  File "/usr/local/lib/python2.7/site-packages/funcy_task_engine-0.0.1.dev0-py2.7.egg/EGG-INFO/scripts/funcy-task-engine.py", line 13, in run

  File "build/bdist.linux-x86_64/egg/funcytaskengine/unittesttaskexecutor.py", line 114, in run
  File "build/bdist.linux-x86_64/egg/funcytaskengine/unittesttaskexecutor.py", line 49, in test_individual
AssertionError

After changing back the status_code assertion to 200, We can confirm that the test topic was created by querying local nsqd:

$ curl http://localhost:4151/stats
nsqd v1.0.0-compat (built w/go1.8)
start_time 2017-04-24T18:13:07Z
uptime 23m37.736277826s

Health: OK

   [funcy_task     ] depth: 0     be-depth: 0     msgs: 0        e2e%:
  • Send a message to the test topic
  - name: publish_message
    initiator:
      type: nsq.NSQPublisherInitiator
      message: >
          {
                "key1": "really_cool_message",
                "key2": "really_cool_message_2"
          }
      nsqd_address: localhost
      topic: "funcy_task"

The above publishes the message on the funcy_task topic. Running the task again shows us the message has been published:

2017-04-24 19:02:15,650 - funcytaskengine.initiators.nsq - DEBUG - {'message': 'publishing_message_nsq'}
2017-04-24 19:02:15,650 - urllib3.connectionpool - DEBUG - Starting new HTTP connection (1): localhost
2017-04-24 19:02:15,652 - urllib3.connectionpool - DEBUG - http://localhost:4151 "POST /put?topic=funcy_task HTTP/1.1" 200 2
2017-04-24 19:02:15,671 - funcytaskengine.engine - DEBUG - {'message': 'state_change_requested'}

And in nsqd:

$ curl http://localhost:4151/stats
nsqd v0.3.8 (built w/go1.6.2)
start_time 2017-04-24T19:01:42Z
uptime 48m21.07915312s

Health: OK

   [funcy_task     ] depth: 1     be-depth: 0     msgs: 1        e2e%:
  • Subscribe to the test test topic / Wait until we receive a message from the test topic
  - name: pull_single_message
    event_fulfillment_strategy:
      type: nsq.NSQStreamingFulfillment
      topic: funcy_task
      channel: test
      address: "localhost:4150"
    transition_conditions:
        - type: nsq.NSQOnMessage

The above state subscribes to the funcy_task topic for the test channel and transitions as soon as a single message is received. Since the message is queued by the 2nd state, a single message will be drained from the channel, which can be seen in the logs below.

2017-04-24 19:57:53,756 - transitions.core - INFO - Entered state pull_single_message
2017-04-24 19:57:53,757 - gnsq.reader.funcy_task.test - DEBUG - starting gnsq.reader.funcy_task.test...
2017-04-24 19:57:53,757 - gnsq.reader.funcy_task.test - DEBUG - querying nsqd...
2017-04-24 19:57:53,757 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] connecting...
2017-04-24 19:57:53,761 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] response: {"max_rdy_count":2500,"version":"0.3.8","max_msg_timeout":900000,"msg_timeout":60000,"tls_v1":false,"deflate":false,"deflate_level":0,"max_deflate_level":6,"snappy":false,"sample_rate":0,"auth_required":false,"output_buffer_size":16384,"output_buffer_timeout":250}
2017-04-24 19:57:53,761 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] sending RDY 1
2017-04-24 19:57:53,761 - gnsq.reader.funcy_task.test - INFO - [localhost:4150] connection successful
2017-04-24 19:57:53,763 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] response: OK
2017-04-24 19:57:53,764 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] got message: 07dc936557726000
2017-04-24 19:57:53,764 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] sending RDY 1
2017-04-24 19:57:53,766 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] finished message: 07dc936557726000
2017-04-24 19:57:53,767 - gnsq.reader.funcy_task.test - DEBUG - [localhost:4150] sending RDY 1
2017-04-24 19:57:53,767 - gnsq.reader.funcy_task.test - DEBUG - closing 1 worker(s)
2017-04-24 19:57:53,767 - gnsq.reader.funcy_task.test - DEBUG - closing 1 connection(s)
  • Insert a record into postgres
  - name: insert_article_into_postgres
    initiator:
        type: postgres.QueryInitiator
        query: >
          INSERT INTO article (article_name, article_desc)
          VALUES ('test', '$UUID_STRING_1')
        connection_string: "dbname=postgres host=localhost user=postgres"

The above executes the query and transitions immediately to the next state. Additionally, it uses a template preprocessor $UUID_STRING_1 to create unique data. This value will be replaced with a uuid4 hex string. Every instance of this variable will be replaced with the same uuid4 string.

There's not much logging other than the state machine logs:

2017-04-24 20:17:18,496 - transitions.core - DEBUG - Entering state insert_article_into_postgres. Processing callbacks...
2017-04-24 20:17:18,496 - transitions.core - INFO - Entered state insert_article_into_postgres
2017-04-24 20:17:18,516 - funcytaskengine.engine - DEBUG - {'message': 'state_change_requested'}
2017-04-24 20:17:18,516 - transitions.core - DEBUG - Initiating transition from state insert_article_into_postgres to state finished...
2017-04-24 20:17:18,516 - transitions.core - DEBUG - Exiting state insert_article_into_postgres. Processing callbacks...
2017-04-24 20:17:18,516 - transitions.core - INFO - Exited state insert_article_into_postgres

But if you check your local postgres you'll see a new record added (before and after included):

$ psql -h localhost -U postgres -d postgres -c "select * from article"
 article_id | article_name | article_desc | date_added
------------+--------------+--------------+------------
(0 rows)

$ psql -h localhost -U postgres -d postgres -c "select * from article"
 article_id | article_name |           article_desc           | date_added
------------+--------------+----------------------------------+------------
          3 | test         | 5d729f1146ae4806aaedde143bba8d44 |
(1 row)
  • Retrieve a record from postgres Now that we have unique data, let's make assertions on it. The final state in our task is:
  - name: assert_message_in_postgres
    initiator:
        type: postgres.SelectInitiator
        query: >
          SELECT * FROM article WHERE article_desc='$UUID_STRING_1'
        connection_string: "dbname=postgres host=localhost user=postgres"
    transition_conditions:
        - type: assertions.LengthEqual
          length: 1

By now this should start looking familiar. We query postgres for the record we inserted in the previous state. The test only passes if there is a single row matching. The above introduces a lot of task engine concepts, that will be described more in depth in other pages of the wiki:

  • template preprocessors
  • initiator
  • transition_conditions
  • event_fulfillment_strategies

Debugging

Debugging with the stdout runner (python bin/funcy-task-engine.py run -t tests/funcy/uuid-nsq-postgres.yml) is relatively straightforward by dropping into a python debugger at any part of the code to figure out what is going wrong.

Running tasks as tests

func-y task engine can provide a lot of value when it is integrated as a build step into your CI process. To do this it needs to report on task results. The built in bin script provides a command to execute a single test, or a configuration file, containing multiple tests, as a test suite, and report on their output using junit xml.

python bin/funcy-task-engine.py xmltest -t tests/funcy/uuid-nsq-postgres.yml

----------------------------------------------------------------------
Ran 1 test in 0.297s

OK

Generating XML reports...

Right now reports are saved to test-reports

The full task can be found: https://github.com/dm03514/func-y-task-engine/blob/master/tests/funcy/uuid-nsq-postgres.yml And is part of development regression verification :)

⚠️ **GitHub.com Fallback** ⚠️