Task implementation

Each task you want to run in Selinon has to be of a type SelinonTask.

The only thing you need to define is the run() method which accepts node_args parameter based on which this task computes its results. The return value of your Task is after that checked against JSON schema (if configured so) and stored in a database or a storage if a storage was assigned to the task in the YAML configuration.

from selinon import SelinonTask

class MyTask(SelinonTask):
    def run(self, node_args):
        # compute a + b
        return {'c': node_args['a'] + node_args['b']}

Now you need to point to the task implementation from YAML configuration files (nodes.yaml):

tasks:
  # Transcripts to:
  #   from myapp.tasks import MyTask
  - name: 'MyTask'
    import: 'myapp.tasks'

See YAML configuration section for all possible configuration options.

In order to retrieve data from parent tasks or flows you can use prepared SelinonTask methods. You can also access configured storage and so.

Task failures

First, make sure you are familiar with retry options that can be passed in the YAML configuration.

If your task should not be rescheduled due to a fatal error, raise FatalTaskError. This will cause fatal task error and task will not be rescheduled. Keep in mind that max_retry from YAML configuration file will be ignored! If you want to retry, just raise any appropriate exception that you want to track in trace logs.

In case you want to reschedule your task without affecting max_retry, just call self.retry(). Optional argument countdown specifies countdown in seconds for rescheduling. Note that this method is not fully compatible with Celery’s retry mechanism.

Check SelinonTask code documentation.

Some implementation details

Here are some implementation details that are not necessary helpful for you:

  • SelinonTask is not Celery task
  • the constructor of the task is transparently called by SelinonTaskEnvelope, which handles flow details propagation and also Selinon tracepoints
  • SelinonTaskEnvelope is of type Celery task