Selinon - A Dynamic Distributed Task Flow Management¶
What is Selinon and why should I use it?¶
Documentation
A quick start guide¶
This section aims to provide you a very high overview of the Selinon project and its configuration. If you want to get deeper, follow next sections.
Selinon concept overview¶
A system consist of flows, storages or databases and tasks. Each flow defines a directed graph (that can be even cyclic, so no DAG limitations) of well defined dependencies on nodes that compute results. A node can be either a task or another flow, so you can make flows as nested as desired. You can make decisions on when to run which nodes based on conditions that are made of predicates.
YAML configuration overview¶
Selinon is configured by easy to learn, easy to read and easy to maintain declarative configuration files written in YAML markup language.
In order to use Selinon, you have to implement tasks and define your flows in a YAML configuration file (or split it across multiple YAML configuration files).
Setting up Selinon¶
First, let’s install Selinon from PyPI:
$ pip3 install selinon
Selinon comes with extras (also known as bundles in another terminology) to reduce your dependencies. You can select desired bundles from the list bellow:
- celery - needed if you use Celery
- mongodb - needed for MongoDB storage adapter
- postgresql - needed for PostgreSQL storage adapter
- redis - needed for Redis storage adapter
- s3 - needed for S3 storage adapter
- sentry - needed for Sentry support
To install Selinon with all extras issue the following command:
$ pip3 install selinon[celery,mongodb,postgresql,redis,s3,sentry]
Note
Some terminals (such as zsh) might require quoting: pip3 install "selinon[celery,mongodb,postgresql,redis,s3,sentry]"
Feel free to select only extras you need in your deployment.
In order to configure Selinon you need to create Celery’s app
instance, pass all Celery-related configuration to it. After that you are ready to configure Selinon:
from selinon import Config
from celery import Celery
from myapp.celery_settings import CelerySettings
app = Celery('tasks')
app.config_from_object(CelerySettings)
Config.set_celery_app(app)
Config.set_config_yaml('path/to/nodes.yaml', ['path/to/flow1.yaml', 'path/to/flow2.yaml'])
Please refer to Celery configuration or Selinon demo for Celery-related pieces. You can also find an example in Selinon demo configuration.
Naming convention¶
Imagine you defined two flows (flow1 and flow2) that consist of five tasks named Task1, Task2, Task3, Task4 and Task5. Such flows are illustrated on images bellow.

In the flow flow2 (shown above) we start node Task4 on condition that is always true (we start if Selinon was requested to start flow2). After Task4 finishes, we start (always) node Task5 which ends the flow flow2. Results of tasks are stored in the database named Storage2.

The second flow is slightly more complex. We (always) start with Task1. Task1 will transparently store results in Storage1. After Task1 finishes, Selinon (to be more precise dispatcher task) checks results of Task1 in Storage1 and if condition result['foo'] == 'bar'
is evaluated as True, dispatcher starts nodes Task2 and flow2. After both Task2 and flow2 finish, dispatcher starts Task3. If the condition result['foo'] == bar
(now result of Task3) is met, Task1 is started and the whole process is iteratively done again. Results of all tasks are stored in database named Storage1 except for results computed in sub-flow flow2, where Storage2 is used (see previous flow graph above).
Note that Task2 and the whole flow2 could be executed in parallel as there are no data nor time dependencies between these two nodes. Selinon runs as many nodes as possible in parallel. This makes it really easy to scale your system - the only bottleneck you can get is number of computational nodes in your cluster or limitations on storage/database side.
Flow definitions¶
Conditions¶
Conditions are made of predicates that can be nested as desired using logical operators - and, or and not. There are predefined predicates available in selinon.predicates
. However you can define your own predicates based on your requirements.
These conditions are evaluated by dispatcher and if a condition is met, desired node or nodes are scheduled. If the condition is evaluated as false, destination nodes on the given edge are not run. Note that conditions are run only once only if all source nodes successfully finish.
If you do not state condition
in edge definition, edge condition will be evaluated always as true.
Since there could run multiple nodes of a type (name) due to cyclic dependencies, an edge condition is evaluated for each possible combination (and only once for the given combination). If you want to avoid such behaviour, check Useful flow patterns section for possible solutions.
Starting nodes¶
You can have a single or multiple starting nodes in your flow. If you define a single starting node, the result of starting node can be propagated to other nodes as arguments if node_args_from_first
is set. If you define more than one starting node, the result cannot be propagated (due to time-dependent evaluation), however you can still explicitly define arguments that are passed to the flow (or make part of your flow a sub-flow).
Flows¶
Flows can be nested as desired. The only limitation is that you cannot now inspect results of sub-flow using edge conditions in a parent flow. There is a plan to remove such limitation in next Selinon releases. Nevertheless you can still reorganize your flow (in most cases) so you are not limited with such restriction.
Running a flow¶
Once you set up Selinon and Selinon does not report any errors in your configuration files, you can run flow simply by calling the run_flow
function (see documentation of run_flow()
):
from selinon import run_flow
dispatcher_id = run_flow('flow1', {'foo': 'bar'})
If you wish to do selective task runs, please refer to Selective task run documentation.
Node failures¶
You can define fallback tasks and fallback flows that are run if a node fails. These fallback tasks and flows (fallback nodes) are not prone to time-dependent evaluation (to be more precise - there is no such thing in the whole Selinon design, so you can be sure that such thing does not occur on Selinon level). These fallback nodes are scheduled on task or flow failures and their aim is to recover from a failure.
Failures are propagated from sub-flows to parent flows. You can find analogy to exceptions as known in many programming languages (like in Python). If a node fails and there is no fallback node that would handle node failure, the whole flow is marked as failed. You can than capture this failure in the parent flow, but this failure will be marked as failure of the whole flow. Note that even in this case, there is no time-dependent evaluation - so if a node in your flow fails, dispatcher can still continue scheduling nodes that are not affected by the failure and once there is nothing to do more, dispatcher marks the flow as failed.
Now let’s assume that you defined two fallbacks. One waits for Task2 failure (Fallback1) and another one waits for a failure of Task1 as well as Task2 failure (Fallback2).

Let’s say that Task1 failed. In that case the decision which fallback would be run depends on Task2 failure (not on time-dependent evaluation). Fallback evaluation is greedy, so if Task2 fails, there is run Fallback2. If Task2 succeeds, Fallback1 is run.
Results of tasks¶
Results of tasks are stored in databases transparently based on your definition in YAML configuration files. The only thing you need to provide is a database adapter that handles database connection and data storing/retrieval. See storage section for more info.
YAML configuration example¶
In this section you can find YAML configuration files that were used for generating images in the previous sections. You can separate flows into multiple files, just provide flow-definitions
key to find all flows defined in the YAML file.
---
flow-definitions:
- name: 'flow1'
edges:
- from:
to:
- 'Task1'
- from:
- 'Task1'
to:
- 'Task2'
- 'flow2'
condition:
name: 'fieldEqual'
node: 'Task1'
args:
key: 'foo'
value: 'bar'
- from:
- 'Task2'
- 'flow2'
to:
- 'Task3'
- from:
- 'Task3'
to:
- 'Task1'
condition:
name: 'argsFieldEqual'
node: 'Task3'
args:
key: 'foo'
value: 'bar'
---
flow-definitions:
- name: 'flow2'
edges:
- from:
to:
- 'Task4'
- from:
- 'Task4'
to:
- 'Task5'
Configuration for failures and failure handling fallbacks that were introduced in Node failures section can be found bellow (no storages in the example).
---
flow-definitions:
- name: 'exampleFallback'
edges:
- from:
to: 'Task1'
- from:
to: 'Task2'
failures:
- nodes:
- 'Task1'
- 'Task1'
fallback:
- 'Fallback1'
- nodes:
- 'Task1'
fallback:
- 'Fallback2'
Entities in the system¶
This configuration could be placed to nodes.yaml
:
---
tasks:
- name: 'Task1'
output_schema: 'path/to/schema1.json'
# `classname` is omitted, it defaults to `name`
# from worker.task1 import Task1
import: 'worker.task1'
storage: 'Storage1'
# queue name to which messages will be sent
queue: 'queue_Task1_v0'
- name: 'Task2'
import: 'worker.task2'
storage: 'Storage1'
output_schema: 'path/to/schema2.json'
# task names are not bound to class names (you can create aliases)
# from worker.task2 import MyTask2 as Task2
classname: 'MyTask2'
queue: 'queue_Task2_v1'
- name: 'Task3'
import: 'worker.task3'
storage: 'Storage1'
output_schema: 'path/to/schema3.json'
classname: 'Task1'
max_retry: 1
# If queue is omitted, Celery's default queue (celery) will be used
#queue: 'celery'
- name: 'Task4'
import: 'worker.task4'
storage: 'Storage2'
output_schema: 'path/to/schema4.json'
classname: 'Task4'
max_retry: 1
- name: 'Task5'
import: 'worker.task1'
storage: 'Storage2'
output_schema: 'path/to/schema1.json'
classname: 'Task4'
# in case of failure retry once after 10 seconds before marking node as failed
max_retry: 1
retry_countdown: 10
flows:
# state all flows you have in your system, otherwise Selinon will complain
- 'flow1'
- 'flow2'
storages:
- name: 'Storage1'
# from storage.storage1 import MyStorage as Storage1
# This way you can have multiple storages of a same type with different
# configuration (different reference name)
classname: 'MyStorage'
import: 'storage.storage1'
configuration: 'put your configuration for Storage1 here'
- name: 'Storage2'
# classname is omitted, it defaults to `name`
# from storage.storage2 import Storage2
import: 'storage.storage2'
configuration: 'put your configuration for Storage2 here'
See YAML configuration section for more details.
Task implementation¶
Each task you want to run in Selinon has to be of a type SelinonTask
.
The only thing you need to define is the run()
method which accepts node_args
parameter based on which this task computes its results. The return value of your Task is after that checked against JSON schema (if configured so) and stored in a database or a storage if a storage was assigned to the task in the YAML configuration.
from selinon import SelinonTask
class MyTask(SelinonTask):
def run(self, node_args):
# compute a + b
return {'c': node_args['a'] + node_args['b']}
Now you need to point to the task implementation from YAML configuration files (nodes.yaml
):
tasks:
# Transcripts to:
# from myapp.tasks import MyTask
- name: 'MyTask'
import: 'myapp.tasks'
See YAML configuration section for all possible configuration options.
In order to retrieve data from parent tasks or flows you can use prepared SelinonTask
methods. You can also access configured storage and so.
Task failures¶
First, make sure you are familiar with retry options that can be passed in the YAML configuration.
If your task should not be rescheduled due to a fatal error, raise FatalTaskError
. This will cause fatal task error and task will not be rescheduled. Keep in mind that max_retry from YAML configuration file will be ignored! If you want to retry, just raise any appropriate exception that you want to track in trace logs.
In case you want to reschedule your task without affecting max_retry
, just call self.retry()
. Optional argument countdown
specifies countdown in seconds for rescheduling. Note that this method is not fully compatible with Celery’s retry mechanism.
Check SelinonTask
code documentation.
Some implementation details¶
Here are some implementation details that are not necessary helpful for you:
SelinonTask
is not Celery task- the constructor of the task is transparently called by
SelinonTaskEnvelope
, which handles flow details propagation and also Selinon tracepoints SelinonTaskEnvelope
is of type Celery task
Storage adapter implementation¶
Currently, there are available prepared database adapters, see Selinonlib module. In order to use these storages, you have to manually install database adapters using extras as they are not explicitly included by requirements.
SqlStorage - SQLAlchemy adapter for SQL databases¶
pip3 install selinon[postgresql]
A configuration example:
storages:
- name: 'MySqlStorage'
classname: 'PostgreSQL'
import: 'selinon.storages.postgresql'
configuration:
connection_string: 'postgres://postgres:postgres@postgres:5432/postgres'
encoding: 'utf-8'
echo: false
The connection string can be parametrized using environment variables. The implementation is available in selinon.storages.postgresql
.
Redis - Redis database adapter¶
pip3 install selinon[redis]
A configuration example:
storages:
- name: 'MyRedisStorage'
classname: 'Redis'
import: 'selinon.storages.redis'
configuration:
host: 'redishost'
port: 6379
db: 0
charset: 'utf-8'
port: 27017
Configuration entries host, port, password and db can be parametrized using environment variables. The implementation is available in selinon.storages.redis
.
MongoDB - MongoDB database adapter¶
pip3 install selinon[mongodb]
A configuration example:
storages:
- name: 'MyMongoStorage'
classname: 'MongoDB'
import: 'selinon.storages.mongodb'
configuration:
db_name: 'database_name'
collection_name: 'collection_name'
host: 'mongohost'
port: 27017
Configuration entries db_name, collection_name, host and port can be parametrized using environment variables. The implementation is available in selinon.storages.mongodb
.
S3 - AWS S3 database adapter¶
`pip3 install selinon[s3]`
A configuration example:
storages:
- name: 'MyS3Storage'
classname: 'S3Storage'
import: 'selinon.storages.s3'
configuration:
bucket: 'my-bucket-name'
aws_access_key_id: 'AAAAAAAAAAAAAAAAAAAA'
aws_secret_access_key: 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'
region_name: 'us-east-1'
Configuration entries bucket, aws_access_key_id, aws_secret_access_key, region_name, location, use_ssl and endpoint_url can be parametrized using environment variables. The implementation is available in selinon.storages.s3
.
Note
You can use awesome projects such as Ceph Nano, Ceph or Minio to run your application without AWS. You need to adjust endpoint_url configuration entry of this adapter to point to your alternative. You can check Selinon’s demo deployment for more info.
In memory storage¶
A configuration example:
storages:
- name: 'Memory'
classname: 'InMemoryStorage'
import: 'selinon.storages.memory'
configuration:
echo: false
No additional requirements are necessary to be installed. This storage adapter stores results in memory. It is suitable for use with Selinon CLI and executor where you just want to run a flow and check results. As results are stored in memory, it is not possible to scale number of workers in many cases as results are stored in memory of a node.
The implementation is available in selinon.storages.memory
.
Few notes on using adapters¶
If you want to you multiple adapters, you can specify multiple adapters in extras when installing:
pip3 install selinon[mongodb,postgresql,s3]
Note that spaces are not allowed in extras (also escape brackets when using zsh).
Using a custom storage adapter¶
You can define your own storage by inheriting from DataStorage
abstract class:
from selinon import DataStorage
class MyStorage(DataStorage):
def __init__(self, host, port):
# arguments from YAML file are pasased to constructor as key-value arguments
pass
def is_connected():
# predicate used to check connection
return False
def connect():
# define how to connect based on your configuration
pass
def disconnect():
# define how to disconnect from storage
pass
def retrieve(self, flow_name, task_name, task_id):
# define how to retrieve results
pass
def store(self, flow_name, task_name, task_id, result):
# define how to store results
pass
def store_error(self, node_args, flow_name, task_name, task_id, exc_info):
# optionally define how to track errors/task failures if you need to
pass
def delete(self, flow_name, task_name, task_id):
# define how to delete results
pass
And pass this storage to Selinon in your YAML configuration:
storages:
# from myapp.storages import MyStorage
- name: 'MyStorage'
import: 'myapp.storages'
configuration:
host: 'localhost'
port: '5432'
If you create an adapter for some well known storage and you feel that your adapter is generic enough, feel free to share it with community by opening a pull request!
Database connection pool¶
Each worker is trying to be efficient when it comes to number of connections to a database. There is held only one instance of DataStorage
class per whole worker. Selinon transparently takes care of concurrent-safety when calling methods of DataStorage
if you plan to run your worker with concurrency level higher than one.
Note
You can also simply share connection across multiple DataStorage
classes in inheritance hierarchy and reuse already defined connections. You can also do storage aliasing as described in Best practices.
If you would like to request some storage from your configuration, you can request storage adapter from Selinon StoragePool
:
from selinon import StoragePool
# Name of storage was set to MyMongoStorage in nodes.yaml configuration file (section storages).
mongo = StoragePool.get_connected_storage('MyMongoStorage')
Selinon will transparently take care of instantiation, connection and sharing connection pool across the whole process. Check out other useful methods of StoragePool
.
Note
If there is anything wrong with storage or storage adapters causing dispatcher failing to determine the next steps in the flow, dispatcher is retried respecting the flow’s retry_countdown
configuration option. This way you will not lose messages that cannot be consumed due to storage errors. However if a task cannot write or read from a storage, it is marked as failed.
Trace flow actions¶
Selinon offers you a mechanism for tracing flow actions. By default Selinon does not output any logs, but you can configure tracing in your application. To trace by Python’s logging, set trace
to true
in the global section of your configuration:
global:
trace:
- logging: true
Selinon will transparently print log messages of important events in your flow to stdout (informative tracepoints using logging.info()
) and stderr (warnings about failures using logging.warning()
). These logs have by default the following structure:
[2017-04-31 08:45:02,231: INFO/MainProcess] SELINON 8176ab3c865e - TASK_SCHEDULE : {"details": {"condition_str": "True", "countdown": null, "dispatcher_id": "f26214e6-fc2a-4e6f-97ed-6c2f6f183140", "flow_name": "myFlow", "foreach_str": null, "node_args": {"foo": "bar"}, "parent": {}, "queue": "selinon_v1", "selective": false, "selective_edge": false, "task_id": "54ec5acb-7a8f-459a-acf3-806ffe53af14", "task_name": "MyTestTask"}, "event": "TASK_SCHEDULE", "time": "2017-04-31 08:45:02.230896"}
The full list of all events with corresponding details that are captured are available in selinon.trace
module.
Note
All tasks have instantiated logger instance under self.log
(see SelinonTask
). Feel free to use this logger to do logging.
If you would like to create your own trace logger, you can do so by registering your custom tracing function in the YAML configuration in the global
section:
global:
trace:
# from myapp.trace import my_trace_func
- function:
name: 'my_trace_func'
import: 'myapp.trace'
The tracing function has two arguments - event
and msg_dict
. Argument event
semantically corresponds to the tracepoint that is being captured and msg_dict
captures all the details related to the tracepoint (see selinon.trace
):
from selinon import Trace
def my_trace_func(event, msg_dict):
if event == Trace.FLOW_FAILURE:
print("My flow %s failed" % msg_dict['flow_name'])
Danger
Note that if any error in tracing function occurs, worker or dispatcher will report this error to standard Python logging and will continue with tracing to other trace functions if configured so.
Note
If you are using ELK (Elastic Search, Logstash, Kibana) stack for aggregating logs, check python-logstash.
Selinon also offers you to put trace events to a storage. For this purpose you can define the following configuration entry:
global:
trace:
- storage:
name: 'MyStorage'
method: 'trace'
By providing the configuration option stated above, Selinon will call MyStorage.trace()
method on each event. Note that the storage needs to be defined in the storage
section in nodes.yaml
, Selinon will automatically instantiate storage adapter and connect to the storage/database once needed.
As you can see, the trace
section consists of list of tracing mechanisms being used. You can define as many tracing entries as you want.
Sentry integration¶
If you would like to use Sentry for monitoring, you can use already existing support. Selinon reports all exceptions to the Sentry instance if you provide the following configuration:
global:
trace:
- sentry:
# You can use environment variables (DSN picked from SENTRY_DSN in this case) using the following notation:
# dsn: '{SENTRY_DSN}'
dsn: 'http://5305e373726b40ca894d8cfd121dea34:78c848fac46040d1a3218cc0bf8ef6a7@sentry:9000/2'
You need to adjust the Sentry DSN configuration so it points to correctly set up Sentry instance.
Also don’t forget to install extras providing Sentry integration:
$ pip3 install selinon[sentry]
Selective task run¶
Selinon offers you a way how to create a declarative configuration written in YAML files that enable you to define flows. You can run these flows as one unit - all tasks in the flow get executed based on conditions and their time and data dependencies (edges that you stated).
Now suppose that you run same flow multiple times for the same flow arguments. In some cases you would like to run only some subset of tasks without stating all possible subsets of flows that can possibly occur. Selinon offers you a way how to cherry-pick tasks that should be run in your flow. Selinon will then compute all edges that need to be fired in order to get to your desired task with respect to task dependencies.
Now let’s assume that you defined the following flow:

Now let’s assume that you want to run only Task2. The only thing you need to do is to run flow selective_flow1 with the following code snippet:
from selinon import run_flow_selective
run_flow_selective('selective_flow1', ['Task2'], node_args={'foo': 'bar'})
Refer to run_flow_selective()
for more information about this function.
Selinon will now by default run only Task1 and Task2 as that is the computed path to Task2 (avoiding Task3 run). Running task Task1 is also optional - results could be reused from previous runs as well - see selective run function that is discussed in the following sections.
Note
You can find selective task runs useful for various scenarios:
- for tasks that results vary in time so you want to recompute them (maybe periodically) and reuse results of tasks that you have already computed
- bug fixes in some task does not require you to run the whole flow again - you can use
run_subsequent
to run also affected tasks that are direct on indirect dependencies of requested task - and anything specific for your application
Reuse results in selective flows¶
If you would like to avoid running some task in selective runs (let’s say Task1 from the previous example) and reuse results from previous runs, you can simply register a selective run function in your YAML configuration file in task definition:
tasks:
- name: 'Task1'
import: 'myproject.tasks'
selective_run_function:
# from myproject.selective import selective_run_function
name: 'selective_run_function'
import: 'myproject.selective'
This function gets called by dispatcher in order to decide whether the desired task should be run based on result that this function computes.
Note
Refer to selinon.routines.always_run()
for default selective run function definition.
The selective run function signature is:
selective_run_function(flow_name, node_name, node_args, task_names, storage_pool)
Arguments passed to this function:
- flow_name - name of flow in which this task was requested to run
- node_name - name of the node in the flow for which the
selective_run_function
was called - node_args - arguments supplied to the flow
- task_names - tasks that were requested to start in the selective flow run (same as passed to
run_flow_selective
) - storage_pool - instantiated
StoragePool
with appropriate mapping (so you can use this storage pool for querying parent tasks results)
The result of selective_run_function
should be None
if the node should be run (to be more precise scheduled) or id of task which result should be reused.
Danger
The selective run function gets executed by dispatcher. This means that it cannot raise any exceptions. If an exception is raised, the behaviour in such cases is undefined.
And, of course, the id that is returned from this function has to exist.
The selective run function is called only for tasks on the path, they are never run for tasks that are not on the direct path to desired task. It is also not called on tasks that you requested to run in selective flow as they are always run.
Note
To fire a selective task run, the following conditions need to be met:
- desired task has to be in the flow (or in any of the sub-flows if
follow_subflows
was set) - there has to be a direct path to desired task, not via failure edges
- if there are multiple paths to desired task/tasks all of them are run
- if there is a direct or indirect cyclic edge to a task, this cyclic edge is always included (you can pass explicit flow arguments to avoid this behaviour)
- naturally, it is possible that the selective flow run does not reach desired tasks (tasks that were requested to start) due to condition evaluation on the path
Sub-flows and subsequent tasks in selective task runs¶
Selinon by default computes only paths for one flow - the flow that you stated in the selective run. If you wish to run desired task also in sub-flows, configure run_subsequent
as true. In this case Selinon will check all sub-flows for desired task occurrence and run also sub-flows, if necessary. Note that desired task in this case needs to be present in any of sub-flows (not necessary in the top-level one).
If you wish to run all subsequent tasks that depend on tasks that you stated in your selective task run, pass run_subsequent
as true in your selective configuration. In this case the selective run function will not get called, rather all subsequent tasks get scheduled based on condition as in basic flow run.
Using selective task runs from YAML configuration¶
Now let’s consider that you defined a flow in our YAML configuration file and you want to reuse this definition in order to run this flow from another flow. Moreover, we want to run only some certain tasks. Selinon easily offers you a solution to this:
---
flow-definitions:
- name: 'selective_flow2'
edges:
- from:
to: 'Task4'
- from: 'Task4'
to: 'selective_flow1'
selective:
tasks:
- 'Task2'
follow_subflows: false
run_subsequent: false
The configuration stated above will define selective sub-flow, that basically runs only Task2 from our previous flow selective_flow1. Semantics of keys in the YAML configuration conform to arguments that are passed to the selective run function.
For better understanding, here is your selective_flow2
visualization:

Note that in this particular scenario you can also do:
from selinon import run_flow_selective
# requesting to run Task2 (stated in selective_flow1), but selective_flow1 is a sub-flow of selective_flow2
# note follow_subflows!
run_flow_selective('selective_flow2', ['Task2'], node_args={'foo': 'bar'}, follow_subflows=True)
Without the selective
part in your selective_flow2 configuration. Using selective
in your YAML configuration is highly dependent on your use-case (and the selective run function implementation).
YAML configuration used in examples¶
---
tasks:
- name: 'Task1'
import: 'myproject.tasks'
- name: 'Task2'
import: 'myproject.tasks'
- name: 'Task3'
import: 'myproject.tasks'
- name: 'Task4'
import: 'myproject.tasks'
flows:
- 'selective_flow1'
- 'selective_flow2'
flow-definitions:
- name: 'selective_flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to:
- 'Task2'
- 'Task3'
- name: 'selective_flow2'
edges:
- from:
to: 'Task4'
- from: 'Task4'
to: 'selective_flow1'
selective:
tasks:
- 'Task2'
follow_subflows: false
run_subsequent: false
Table of Contents
YAML configuration specification¶
Now let’s take a look at the YAML configuration file structure. At the top level, there are listed the following keys:
---
tasks:
# a list of tasks available within the system
flows:
# a list of flows available within the system
storages:
# a list of storages available within the system
global:
# global Selinon configuration
flow-definitions:
# a list of flow definitions
Note
If you have a lot of flows or you want to combine flows in a different way, you can place configuration of entities (tasks, storages and flows) into one file (called nodes.yaml) and flow definitions can be spread across multiple files.
Tasks¶
Configuration of a task in the YAML configuration can look like the following example (all possible configuration keys stated):
tasks:
- name: 'MyTask1'
classname: 'MyTask1Class'
import: 'myapp.tasks'
max_retry: 5
retry_countdown: 120
output_schema: 'myapp/schemas/my-task1.json'
storage: 'Storage1'
storage_read_only: false
storage_task_name: 'MyTaskOne'
selective_run_function:
function: 'my_selective_function'
import: 'myapp.selective'
queue: 'my_task1_queue'
throttling:
seconds: 10
A task definition has to be placed into tasks section, which consists of list of task definitions.
name¶
A name of a task. This name is used to refer to task in your flows, it is not necessarily task’s class name (see classname
option).
- Possible values:
- string - a task name
- Required: true
import¶
Module that should be used to import task.
- Possible values:
- string - a module to be used in import statement
- Required: true
classname¶
Name of a class that should be imported. If omitted it defaults to name
.
- Possible values:
- string - task’s class name
- Required: false
- Default: task’s
name
configuration option
max_retry¶
Maximum number of retries of the task (on failures - when an exception is raised) before the task is marked as failed.
- Possible values:
- positive integer - maximum number of retries to be performed
- Required: false
- Default: 0 - no retries on task failures are done
retry_countdown¶
Number of seconds before a task should be retried (retry delay).
- Possible values:
- positive integer - number of seconds for retry delay
- Required: false
- Default: 0 - no delay is performed
output_schema¶
JSON schema that should be used to validate results before they are stored in a storage/database. If task’s result does not correspond to JSON schema, task fails and is marked as failed or retried based on the max_retry
configuration option.
- Possible values:
- string - a path to JSON schema
- Required: false
- Default: None - no JSON schema validation is done on task results
storage¶
Storage name that should be used for task results.
- Possible values:
- string - a name of storage
- Required: false
- Default: None - task results are discarded
queue¶
Broker queue that should be used for message publishing for the given task.
Queue name can use environment variables that get expanded (e.g. queue: {DEPLOYMENT_PREFIX}_queue_v0
will get expanded to testing_queue_v0
if DEPLOYMENT_PREFIX
environment variable is testing
). This allows you to parametrize resources used in deployment.
- Possible values:
- string - a name of queue
- Required: false
- Default:
celery
(celery’s default queue)
storage_read_only¶
Mark storage as read-only. Task results will not be stored to configured storage, but configured storage will be available via self.storage
.
- Possible values:
- boolean - true if results shouldn’t be stored in the configured storage
- Required: false
- Default: false - results are saved to a storage if a storage was configured
storage_task_name¶
Rename task name for Storage
operations. Selinon will perform translation of task name before storage operations get called.
- Possible values:
- string - a name/alias of task when storing task results
- Required: false
- Default: task’s
name
configuration option
selective_run_function¶
Selective run function that should be applied on selective task runs.
- Possible values:
- following keys for pointing to the selective run function:
- function - a name of the function to be imported
- import - a module name to be used for function import
- Required: false
- Default:
selinon.routines.always_run()
- a task will be always forced to run on selective task runs
throttling¶
Task execution throttling configuration. See Optimization section for more detailed explanation.
- Possible values:
- following keys for time delay configuration, each configurable using a positive integer, if omitted defaults to 0:
- days
- seconds
- microseconds
- milliseconds
- minutes
- hours
- weeks
- Required: false
- Default: all time delay configuration keys set to zero - no throttling is performed
Storages¶
Here is an example of a storage configuration with all the configuration options:
storages:
- name: 'Storage1'
import: 'myapp.storages'
classname: 'SqlStorage'
cache:
name: 'Cache1'
import: 'myapp.caches'
configuration:
size: 10
configuration:
connection_string: 'postgresql://postgres:postgres@localhost:5432/mydatabase'
echo: false
A storage definition has to be placed into storages section, which is a list of storage definitions.
name¶
A name of a storage. This name is used to refer storage in tasks.
- Possible values:
- string - a name of the storage
- Required: true
import¶
Module that holds storage class definition.
- Possible values:
- string - a module to be used to import storage
- Required: true
classname¶
A name of the database storage adapter class to be imported.
- Possible values:
- string - a name of the class to import
- Required: false
- Default: storage
name
configuration option
configuration¶
Configuration that will be passed to storage adapter instance. This option depends on database adapter implementation, see storage adapter implementation section.
cache¶
Cache to be used for result caching, see cache section and the optimization objective.
Flow definition¶
A flow definition is placed into a list of flow definitions in the YAML configuration file.
flow-definitions:
- name: 'flow1'
propagate_parent:
- 'flow2'
node_args_from_first: true
#propagate_compound_finished:
max_retry: 2
retry_countdown: 10
propagate_finished:
- 'flow2'
propagate_node_args:
- 'flow2'
nowait:
- 'Task1'
eager_failures:
- 'Task2'
cache:
name: 'Cache1'
import: 'myapp.caches'
configuration:
size: 10
sampling:
name: 'constant'
args:
# check for flow state each 10 seconds
retry: 10
throttling:
seconds: 10
edges:
- from:
- 'Task1'
to:
- 'Task2'
- 'flow2'
condition:
name: 'fieldEqual'
args:
key:
- 'foo'
- 'bar'
value: 'baz'
- from: 'flow2'
to: 'Task3'
- from: 'Task3'
to: 'flow3'
condition:
name: 'fieldEqual'
args:
key:
- 'foo'
value: 'bar'
foreach:
import: 'myapp.foreach'
function: 'iter_foreach'
# result of the function would be used as sub-flow arguments to flow3
propagate_result: false
failures:
nodes:
- 'Task1'
fallback:
- 'Fallback1'
name¶
A name of the flow. This name is used to reference the flow.
- Possible values:
- string - a name of the flow
- Required: true
propagate_parent¶
Propagate parent nodes to sub-flow or sub-flows. Parent nodes will be available in the self.parent
property of the SelinonTask
class and it will be possible to transparently query results using SelinonTask
methods.
- Possible values:
- string - a name of the flow to which parent nodes should be propagated
- list of strings - a list of flow names to which parent nodes should be propagated
- boolean - enable or disable parent nodes propagation to all sub-flows
- Required: false
- Default: false - do not propagate parent to any sub-flow
propagate_finished¶
Propagate finished nodes from sub-flows. Finished nodes from sub-flows will be available in the self.parent
of the SelinonTask
class property as a dictionary and it will be possible to transparently query results using SelinonTask
methods. All tasks will be recursively received from all sub-flows of the inspected flow.
- Possible values:
- string - a name of the flow from which finished nodes should be propagated
- list of strings - a list of flow names from which finished nodes should be propagated
- boolean - enable or disable finished nodes propagation from all sub-flows
- Required: false
- Default: false - do not propagate finished nodes at all from any sub-flow
propagate_compound_finished¶
Propagate finished nodes from sub-flows in a compound (flattened) form - see Useful flow patterns for more info. Finished nodes from sub-flows will be available in the self.parent
of the SelinonTask
class property as a dictionary and it will be possible to transparently query results using SelinonTask
methods. All tasks will be recursively received from all sub-flows of the inspected flow.
- Possible values:
- string - a name of the flow from which finished nodes should be propagated
- list of strings - a list of flow names from which finished nodes should be propagated
- boolean - enable or disable finished nodes propagation from all sub-flows
- Required: false
- Default: false - do not propagate finished nodes at all from any sub-flow
propagate_node_args¶
Propagate node arguments to sub-flows.
- Possible values:
- string - a name of flow to which node arguments should be propagated
- list of strings - a list of flow names to which node arguments should be propagated
- boolean - enable or disable node arguments propagation to all sub-flows
- Required: false
- Default: false - do not propagate flow arguments to any sub-flow
node_args_from_first¶
Use result of the very first task as flow arguments. There has to be only one starting task if this configuration option is set.
- Possible values:
- boolean - enable or disable result propagation as a flow arguments
- Required: false
- Default: false - do not propagate result of the first task as flow arguments
nowait¶
Do not wait for a node (a task or a sub-flow) to finish. This node cannot be stated as a dependency in the YAML configuration file. Note that node failure will not be tracked if marked as nowait
.
This option is an optimization - if all tasks that are not stated in nowait finish, dispatcher will schedule nowait nodes and marks the flow finished/failed (based on task/fallback success) and will not retry.
- Possible values:
- string - a node that should be started with nowait flag
- list of strings - a list of nodes that should be started with nowait flag
- Required: false
- Default: an empty list - wait for all nodes to complete in order to end flow
eager_failures¶
If a node stated in eager_failures fails, dispatcher will immediately stop scheduling new nodes and marks flow as failed without checking results of other nodes inside flow.
In case there is configure max_retry configuration option, flow will be restarted respecting max_retry configuration option.
- Possible values:
- string - a node that failure can cause the whole flow eager failure
- list of strings - a list of nodes that can cause eager flow failure (if any node from the list fails)
- bool - if set to true any node failure inside a flow will cause eager flow failure
- Required: false
- Default: an empty list (or false) - do not stop scheduling eagerly on any failure
max_retry¶
Maximum number of retries of the flow in case of flow failures. A flow can fail when one of nodes is marked as failed (task or any sub-flow). In case of retries, all tasks are scheduled from the beginning as in the first run.
- Possible values:
- positive integer - maximum number of retries to be performed
- Required: false
- Default: 0 - no retries on flow retries are done
retry_countdown¶
Number of seconds before a flow should be retried (retry delay).
- Possible values:
- positive integer - number of seconds for retry delay
- Required: false
- Default: 0 - no delay is performed
sampling¶
Define a custom module where dispatcher sampling strategy function (see Optimization for more info).
Possible values:
name
- a name of sampling strategy function to be usedDefault:
biexponential_increase
import
- a module name from which the sampling strategy function should be importedDefault:
selinon.strategies
args
- additional sampling strategy configuration options passed as keyword arguments to the sampling strategyDefault:
start_retry: 2
max_retry: 120
Required: false
Defaults: as listed in each configuration key
Refer to selinon.strategies
for additional info.
throttling¶
Flow execution throttling configuration. See Optimization section for more detailed explanation.
- Possible values:
- following keys for time delay configuration, each configurable using a positive integer, if omitted defaults to 0:
- days
- seconds
- microseconds
- milliseconds
- minutes
- hours
- weeks
- Required: false
- Default: all time delay configuration keys set to zero - no throttling is performed
cache¶
Cache to be used for node state caching, see cache section and the optimization objective.
edges¶
A list of edges describing dependency on nodes. See Flow edge definition.
Flow edge definition¶
A flow consist of time or data dependencies between nodes that are used in the flow. These dependencies are modeled using edges which are conditional and can have multiple source and multiple destination nodes (tasks or flows).
from¶
A source node or nodes of the edge. If no source edge is provided, the edge is considered to be a starting edge (the from
keyword however needs to be explicitly stated). There can be multiple starting edges in a flow.
- Possible values:
- string - name of the source node - either a task name or a flow name
- list of strings - a list of names of source nodes
- None - no source nodes, edge is a starting edge
- Required: true
to¶
- Possible values:
- string - name of the destination node - either a task name or a flow name
- list of strings - a list of names of destination nodes
- Required: true
condition¶
A condition made of predicates that determines whether the edge should be fired (destination nodes should be scheduled). Boolean operators and, or and not can be used as desired to create more sophisticated conditions.
- Possible values:
and
- N-ary predicate that is true if all predicates listed in the list are trueor
- N-ary predicate that is true if any predicate listed in the list is truenot
- unary predicate that is true if listed predicate is falsename
- a reference to a leaf predicate to be used, this predicate is imported from predicates module defined in theglobal
section- Required: false
- Default:
alwaysTrue()
predicate defined inselinon.predicates.alwaysTrue
which always evaluates to true
If name
is used, there are possible following configuration options:
- node - name of the node to which the given condition applies, can be omitted if there is only one source node
- args - arguments that should be passed to predicate implementation as keyword arguments
An example of a condition definition:
condition:
#or:
and:
- name: 'fieldEqual'
node: 'Task1'
args:
key: 'foo'
value: 'bar'
- not:
name: 'fieldExist'
node: 'Task2'
args:
key: 'baz'
value: 42
Please refer to the predicates
module available in selinon.predicates
. This module states default predicates that could be immediately used. You can also provide your own predicates by configuring used module in the global configuration section.
foreach¶
Spawn multiple (let’s say N, where N is a variable determinated on run time) nodes. The foreach function will be called iff condition
is evaluated as true. See Useful flow patterns for more info.
- Possible values:
- foreach function definition:
function
- a name of the function that should be usedimport
- a module from which the foreach function should be importedpropagate_result
- if true (defaults to false), result of the foreach function will be propagated to sub-flows (cannot be propagated to tasks), this option is disjoint withpropagate_node_args
- Required: false
- Default: None
Flow failures¶
A list of failures that can occur in the system and their fallback nodes.
- Possible values:
- a list of failures each item defining:
nodes
- a node name or a list of node names that can trigger fallback scheduling in case of failurefallback
- a node name or a list of node names (a task name or flow names) that should be scheduled in case of failurecondition
- condition that would be evaluated, if true the fallback is triggered; see condition definition on task flow edges for more info and examples- Required: false
- Default: an empty list of failures - all failures will be propagated to parent flows
An example of a failure definition:
failures:
- nodes:
- 'Task1'
- 'Task2'
fallback: 'Fallback1'
- nodes: 'Task1'
fallback:
- 'Fallback1'
- 'Fallback2'
Failures are greedy, if multiple fallbacks can be run, there is used failure that covers as mush as possible of the failed nodes.
Note
- fallbacks are run once there are no active nodes in the flow - dispatcher is trying to recover from failures in this place
- there is scheduled one fallback at the time - this prevents from time dependency in failures
- there is always chosen failure based how many nodes you expect to fail - dispatcher is greedy with fallback - that means it always choose failure that is dependent on highest number of nodes; if multiple failures can be chosen, lexical order of node names comes in place
- a flow fails if there is still a node that failed and there is no failure specified to recover from failure
- fallback on fallback is fully supported (and nested as desired)
global¶
Global configuration section for Selinon.
predicates_module¶
Define a custom predicate module. There will be imported predicates from this module (using predicate name
).
- Possible values:
- string - a predicate module from which predicates module should be imported
- Required: false
- Default:
selinon.predicates
default_task_queue¶
Default queue for tasks. This queue will be used for all tasks (overrides default Celery queue), unless you specify queue
in the task definition, which has the highest priority.
The queue name can be parametrized using environment variables - see queue configuration for more info.
- Possible values:
- string - a queue name for tasks
- Required: false
- Default:
celery
- Celery’s default queue
default_dispatcher_queue¶
Queue for dispatcher task. This queue will be used for all dispatcher tasks (overrides default Celery queue), unless you specify queue
in the flow definition, which has the highest priority.
The queue name can be parametrized using environment variables - see queue configuration for more info.
- Possible values:
- string - a queue for dispatcher to schedule flows
- Required: false
- Default:
celery
- Celery’s default queue
trace¶
Keep track of actions that are done in flow. See Trace flow actions for more info with configuration examples.
- Possible values:
- an array where each entry configures tracing mechanism used
function
- register a callback function that is called on each event, configuration options:
import
- import to be used to import tracing functionname
- name of function to be importedlogging
- use Python’s logging facilities, configuration options:
true
(boolean) - turn on Python’s loggingsentry
- use Sentry for monitoring task failures (only events of typeTASK_FAILURE
), configuration options:
dsn
- Sentry’s DSN to describe target service and Sentry’s project to log to, can be parametrized based on environment variables similarly as queues - see queue configuration for more info.storage
- use storage adapter to store traced events, configuration options:
name
- name of storage to be usedmethod
- name of method to call on storage adapter instancejson
- trace directly to a JSON
- not parameterizable - accepts only a boolean - e.g.
json: true
to turn JSON tracing on, all tracepoints are one-liners so they are consumable to ELK (Elastic Seach+Logstash+Kibana) or (Elastic Search+Fluentd+Kibana) stack for later log inspection- Required: false
- Default: do not trace flow actions
migration_dir¶
A path to directory containing generated migrations. See Migrations - Redeployment with changes for more info.
A name of migration directory can be parametrized using environment variables - see queue configuration for more info on how to reference environment variables.
- Possible values:
- string - a path to migration directory
- Required: false
- Default: no migration directory - no migrations will be performed
cache¶
Define cache for result caching or for task state caching - see distributed caches in Optimization section. Each cache has to be of type Cache
.
name¶
Name of the cache class to be imported.
- Possible values:
- string - a name of the cache class
- Required: false
- Default: None - no cache is used
import¶
Name of the module from which the cache should be imported.
- Possible values:
- string - a name of the cache class
- Required: false
- Default: None - no cache is used
configuration¶
Additional configuration options that are passed to the cache constructor as keyword arguments. These configuration options depend on particular cache implementation.
Best practices¶
Here are some notes and tips on how to write YAML config files and how to work with Selinon.
Do not introduce a new flow unless it is necessary¶
Each flow adds some overhead. It is reasonable to introduce a new flow when:
- you want to reuse certain implementation.
- you want to encapsulate flow so you can run fallback nodes for the whole group of tasks
- you want to throttle or optimize certain tasks (you want to let specific workers do specific flows or tasks)
- you want to recursively run the whole flow or you have dependencies on flows so they are not resolvable except copying YAML configuration logic
- you want to refer to flow as a group of tasks - a separate indivisible logic
- you need to run a flow with different arguments - each flow is defined by a group of tasks, their time and data dependencies and arguments that are propagated in the whole flow to each task (possibly except the very first one)
- re-usability in general
- based on semantics of your flows
- any other reasonable argument…
Do not add flags unless you really need them¶
Selinon implementation was desingned to be lazy. That means it was designed not to add overhead for you unless you really need it. If you don’t need flags such as propagate_parent
or propagate_finished
you don’t need to state them in the YAML configuration file. If you state them, it will add additional computational overhead that you don’t necessarily need.
Make flow edges as minimal as possible¶
Even though Selinon supports multiple source and destination nodes in the edge configuration it is generally a good idea to make these edges as minimal as possible. This way you can mark unused edges with alwaysFalse
condition without need to purge queues when you do redeployment with different configuration.
Aliases for flows, tasks and storages¶
You probably saw (based on examples) that you can easily define a task alias - tasks share Python class/implementation, but have different name:
tasks:
# from myapp.tasks import Task1
- name: 'Task1'
import: 'myapp.tasks'
# from myapp.tasks import Task1 as Task2
- name: 'Task2'
classname: 'Task1'
import: 'myapp.tasks'
This is useful when you want to run same code multiple times in a flow (since nodes are referenced by names). Also check storage_task_name
configuration option that enables you to do store with different task name compared to one stated in the nodes.yaml file.
You can also define storage alias - useful when you want to use same database/storage adapter but with different configuration:
storages:
# from myapp.storages import PostgreSQL as UserPostgres
- name: 'UserPostgres'
classname: 'PostgreSQL'
import: 'myapp.storages'
configuration:
host: postgres-user
port: 5432
# from myapp.storages import PostgreSQL as AdminPostgres
- name: 'AdminPostgres'
classname: 'PostgreSQL'
import: 'myapp.storages'
configuration:
host: postgres-admin
port: 5432
You can also create flow aliases. This is especially helpful if you want to re-use flow configuration such as edges, but you want to separate one flow to different queue (for example due to SLA requirements, so more workers can serve serve messages on SLA-specific queue). You can do so easily since YAML supports references:
flow-definitions:
- &flow1_def
name: 'flow1'
queue: 'flow1_v1'
propagate_node_args: true
edges:
- from:
to: 'Task4'
- from: 'Task4'
to: 'Task5'
- <<: *flow1_def
name: 'flow1_sla'
queue: 'flow1_sla_v1'
# node_args_from_first and edges configuration will be taken from flow1
Make your queue names configurable via environment variables¶
You can easily make queue names (for tasks and for flows/dispatcher) dependent on environment variables:
tasks:
- name: 'Task1'
import: 'myapp.tasks'
queue: '{DEPLOYMENT_PREFIX}_task1_v1'
Selinon will expand queue name for you based on DEPLOYMENT_PREFIX
. Let’s say you set DEPLOYEMENT_PREFIX
to testing
. The expanded queue name will be testing_task1_v1
.
Danger
Note that the DEPLOYMENT_PREFIX
variable needs to be set in the environment, otherwise queue name expansion will fail.
Now you can deploy two instances of your system on the same cluster without affecting each other. This might be helpful for testing purposes when you have a testing cluster where you want to run integration tests that do not affect each other. Similarly you can introduce such option for storages.
Useful flow patterns¶
This section might give an overview of some flow patterns, how to model them and how they behave in different situations.
Variable number of child tasks or sub-flows¶
If you would like to schedule multiple tasks (or sub-flows) of a same type, this configuration will perfectly work for you (assuming you want to schedule 3 tasks Task2):
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to:
- 'Task2'
- 'Task2'
- 'Task2'
OK, now let’s assume that you don’t know how many tasks should be executed and you want to determine (or compute) the exact task count on runtime. Selinon offers you to do that by plugging your foreach function:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
foreach:
# from myapp.foreach import my_foreach_function
function: 'my_foreach_function'
import: 'myapp.foreach'
Here is the corresponding visualization:

An example of such foreach function could be:
def my_foreach_function(storage_pool, node_args):
return range(node_args.get('task2_schedule_count', 0))
Danger
As foreach function is executed by dispatcher it can not raise any exceptions. Raising an exception in foreach function leads to undefined behaviour.
Note
You can have multiple destination nodes in edge definition even with foreach function in use.
The return value for foreach function should be an iterable (see bellow why) which length determines number tasks that should be scheduled.
Arguments for the foreach function are:
- storage_pool - instantiated
StoragePool
that can be used to query parent task results - node_args - arguments of the current flow
Note
You can combine the foreach
configuration with condition
. Provided condition would get evaluated before foreach function as one would expect.
If you would like to schedule sub-flows dynamically, you can do so by providing a sub-flow in the destination of edge definition:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'flow2'
foreach:
# from myapp.foreach import my_foreach_function
function: 'my_foreach_function'
import: 'myapp.foreach'
# uncomment if results of the foreach function should be used as sub-flow arguments
#propagate_result: true
In case you also provide propagate_result
as true, each entry in the resulting iterable would be propagated to sub-flow flow2 as sub-flow arguments.
Permutations of all parent tasks a.k.a Cyclic diamond with combinations¶
Consider the following flow definition:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to:
- 'Task2'
- 'Task3'
- from:
- 'Task2'
- 'Task3'
to: 'Task4'
- from: 'Task4'
to: 'Task1'
A visualization of such flow using Selinonlib CLI would be:

If the visualization is not straightforward for you at first, basically there is a “diamond” that is called in cycle (without conditions):

As one would expect Task4 is called after Task2 and Task3 are finished. Now let’s simulate how Selinon would evaluate such cyclic dependencies.
We start with task Task1. After task Task1 finishes, Selinon schedules Task2 and Task3. Let’s assign identifiers (id) to these tasks:
- Task2 with id <task2_1>
- Task3 with id <task3_1>
So now Selinon can proceed to task Task4. This task will have parent tasks with identifiers <task2_1> and <task3_1> (for Task2 and Task3 respectively).
Once task Task4 finishes, Selinon will schedule Task1 again. After the second Task1 finishes, there are started Task2 and Task3, now with the following identifiers:
- Task2 with id <task2_2>
- Task3 with id <task3_2>
And we can proceed to task Task4. Now the question could be: What will be parent tasks of task Task4 in the second iteration?
The answer is that there will be scheduled multiple tasks Task4 with the following parent task identifiers:
- Task4 with parent identifiers:
- <task2_1> for task Task2 from the first iteration
- <task3_2> for task Task3 from the second iteration
- Task4 with parent identifiers:
- <task2_2> for task Task2 from the second iteration
- <task3_1> for task Task3 from the first iteration
- Task4 with parent identifiers:
- <task2_2> for task Task2 from the second iteration
- <task3_2> for task Task3 from the second iteration
If we would stop flow after the second iteration (before task Task1 is run for the third time), we would see that there were executed four tasks Task4 to respect all possible combinations.
Cyclic Diamond without combinations¶
If you would like to avoid running Task4 in the previous example for each combination and rather run it once per iteration, you need to create a sub-flow:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'flow2'
- from: 'flow2'
to: 'Task4'
- from: 'Task4'
to: 'Task1'
- name: 'flow2'
edges:
- from:
to: 'Task2'
- from:
to: 'Task3'
The the visualization of flow flow1 would be:

And the visualization of flow flow2 would be:

Now flow flow2 would be executed once per each iteration in flow flow1 and there will be exactly one Task4 invocation per each flow flow1 iteration.
Note
You can pass propagate_finished
(or propagate_compound_finished
) and propagate_parent
to pass information about parent nodes and finished nodes. See YAML configuration for more info.
Aggregating tasks from sub-flows¶
Results of flows are serialized and stored as JSONs in the result backend (configured by Celery configuration). Each dispatcher reports only tasks and flows that were started inside flow that was handled by dispatcher task (except nowait
nodes). Tasks from sub-flows are captured in dispatchers that handle started sub-flows.
If you request to propagate finished nodes from sub-flows to tasks as parents you need to explicitly state propagate_finished
as true in your YAML configuration option. Information is gathered and you can transparently query parent task results (or failures).
By default you need respect how sub-flows are organized - so if your flow flow1 starts sub-flow flow2 and that starts flow3, you need to respect this organization if you would like ask for results of task Task1 started in flow flow3 from flow flow1:
from selinon import SelinonTask
# this task is run in flow1
class MyTask(SelinonTask):
def run(self, node_args):
print('Task1' in self.parent['flow2']['flow3'].keys()) # should be True iff Task1 was successful in flow3
task1_result = self.parent_flow_result(['flow2', 'flow3'], 'Task1') # optionally pass index if there were multiple instances of Task1
print('Result of Task1 run in parent sub-flow flow3 is {}'.format(task1_result))
If you would like to compound (or flatten) these parent sub-flow organization details, just set propagate_compound_finished
instead of propagate_finished
. In that case you can directly use:
from selinon import SelinonTask
# this task is run in flow1
class MyTask(SelinonTask):
def run(self, node_args):
print('Task1' in self.parent['flow2']['flow3'].keys()) # should be True iff Task1 was successful in flow3
task1_result = self.parent_flow_result('flow2', 'Task1') # optionally pass index if there were multiple instances of Task1
print('Result of Task1 run in parent sub-flow flow3 (ignoring flow2 organization) is {}'.format(task1_result))
Note
By setting propagate_compound_finished
you will lose information in which sub-flow were which tasks run. If you run tasks of a same name in different sub-flows, these tasks will be merged into one single list.
Frequently Asked Questions (FAQ)¶
I see only one contributor, should I trust you?¶
There is currently one contributor, but every project started somehow. Selinon was designed for fabric8-analytics project at Red Hat that gathers project information for openshift.io (introduced at the Red Hat 2017 summit keynote), where it is still used and it already served millions flows and even more tasks. If you find Selinon interesting for your use-case, feel free to use it (and buy me some beer or at least let me know that you like it or share any experiences you have).
If you find a bug, place for enhancement or anything where I can be helpful, feel free to let me know. And not to forget - even you can be Selinon developer.
Dispatcher does not work properly or hangs in an infinite loop.¶
Check your result backend configuration for Celery. Currently, there is supported and well tested only Redis and PostgreSQL (feel free to extend this list based on your experiences). There were found serious issues when rpc was used.
Can I see Selinon in action?¶
See Selinon demo or fabric8-analytics project, especially it’s fabric8-analytics-worker.
Can I simulate Selinon run without deploying huge infrastructure?¶
Yes, you can. Just use shipped executor:
selinon-cli execute --nodes-definition nodes.yml --flow-definitions flow1.yml flow2.yml --help
This way you can also use Selinon to run your flows from a CLI. You can also explore prepared containerized demo.
I’m getting Python related errors!¶
Check your Python version. Selinon currently support only Pyhon3+ as Celery project is about to drop Python2 support.
Should I replace Celery with Selinon?¶
Well, hard to say. Celery is a great project and offers a lot of features. Selinon should be suitable for you when you have time and data dependencies between tasks and you can group these tasks into flows that are more sophisticated than Celery’s primitives such as chain or chord. If this is true for you, just give Selinon a try. If you are already using Celery, check prepared guide on how to migrate from raw Celery to Selinon.
How should I name tasks and flows?¶
You should use names that can became part of function name (or Python3 identifier). Keep in mind that there is no strict difference between tasks, flows and sub-flows, so they share name space.
How can I access nested keys in a dict in default predicates?¶
Assuming you are using predicates from selinon.predicates
. What you want is (in Python3):
message['foo']['bar'] == "baz"
Predicates were designed to deal with this - just provide list of keys, where position in a list describes key position:
condition:
name: 'fieldEqual'
args:
key:
- 'foo'
- 'bar'
value: 'baz'
I need a custom predicate, how to write it?¶
If selinon.predicates
predicates are not suitable for you or you miss a specific predicate, you can define your own module in the global
configuration. See YAML configuration section for details.
What exceptions can predicates raise?¶
Predicates were designed to return always true or false. If a condition cannot be satisfied, there is returned false. So it is safe for example to access possibly non-existing keys - predicates will return false. This idea has to be kept even in your predicates as predicates are executed by dispatcher. If you rise an exception inside predicate the behaviour is undefined.
Danger
Predicates were designed to return always true or false. No exceptions can be raised!
Do I need result backend?¶
Or more precisely: Do I need a result backend even when I am using my custom database/storage for task results?
Yes, you do. The result backend is used by Celery to store information about tasks (their status, errors). Without result backend, Selinon is not capable to get information about tasks as it uses Celery. Do not use rpc backend as there were noted issues.
Why there is used generated code by Selinon?¶
Since YAML config files cover some logic (such as conditions), this needs to be evaluated somehow. We could simply interpret YAML file each time, but it was easier to generate directly Python code from YAML configuration files and let Python interpreter interpret it for us. Other parts from YAML file could be directly used, but mostly because of consistency and debugging the whole YAML file is used for code generation.
You can easily check how YAML files is transformed to Python code simply by running:
selinon-cli inspect --nodes-definition nodes.yml --flow-definitions flow1.yml flow2.yml --dump outputfile.py
How to write conditions for sub-flows?¶
This is currently a limitation of Selinon. You can try to reorganize your flows so you don’t need to inspect parent subflows, for most use cases it will work. Adding support for this is for future releases planned.
Is it possible to do changes in the configuration and do continuous redeployment?¶
Yes, you can do so. BUT make sure you do migrations - see the migration section to get insights on how to do it properly.
What happens if I forgot to do migrations?¶
If you do changes in the YAML configuration files and you do not perform migrations, unpredictable things may happen if your queues have still old messages. It’s always a good idea to check whether migration files need to be generated. See Migrations - Redeployment with changes for more details.
Is my YAML config file correct? How to improve or correct it?¶
See Best practices section for tips.
Can I rely on checks of YAML files?¶
You can a bit, but think before you write configuration. There are captured some errors, but checks are not bullet-proof. If you make logical mistakes or your flow is simply wrong, Selinon is not AI to check your configuration. There are not done checks on transitive dependencies, if given conditions could evaluate or so.
Is there a way how to limit task execution time?¶
Currently there is no such mechanism. Celery has time limit configuration option, but note that Selinon tasks are not Celery tasks.
Why there is no support for older Celery versions?¶
One of the requirements of Selinon is, that it defines tasks (Dispatcher
and SelinonTaskEnvelope
) before the Celery’s application gets instantiated. Older versions of Celery requested tasks to be registered after the Celery’s application was created. This makes it chicken-egg problem.
What broker type do I need?¶
Selinon uses Celery for queue handling and running, so you have to use broker implementation that is supported by Celery - such as SQS or RabbitMQ.
Selinon requires that you messages are delivered - it’s okay if messages are delivered more than once (see for example SQS details regarding deliver at least one). You will just end up with multiple tasks executed at the same time. You can tackle that in your application logic.
Why does a flow finishes too early when using AWS SQS?¶
Most likely you are using AWS SQS standard queues that can deliver a single message multiple times. If your application logic processes one message but a task fails when the second message is processed (e.g. integrity errors if task ids are unique in PostgreSQL), Celery overwrites task state stored in the result backend. This causes that even if task succeeds (first run) it’s state can be tracked as failed.
A solution to this problem is to patch Celery’s result backend to restrict only one task, something like (in case of PosgreSQL as a result backend):
diff --git a/celery/backends/database/__init__.py b/celery/backends/database/__init__.py
index 506a4cc69..57d29a6ca 100644
--- a/celery/backends/database/__init__.py
+++ b/celery/backends/database/__init__.py
@@ -110,6 +110,9 @@ class DatabaseBackend(BaseBackend):
task = Task(task_id)
session.add(task)
session.flush()
+ elif task.status in states.READY_STATES:
+ # Do not overwrite on multiple message delivery (e.g. SQS).
+ return task.result
task.result = result
task.status = state
task.traceback = traceback
Or simply switch to AWS SQS FIFO queues that guarantee exactly once delivery of a message.
What does Selinon mean?¶
Selinon means Celery in Greek language. The main reason for using Greek language was the fact that there are already successful project out there that do distributed systems and have Greek names (see Kubernetes as an example). But Greek language is cool anyway :-).
Deep diving into Selinon internals¶
To deeply understand Selinon, let’s assume that we have the following flow flow1:

If you are interested, here is YAML configuration used to generate this example:
---
tasks:
- name: 'Task1'
import: 'myproject.tasks'
- name: 'Task2'
import: 'myproject.tasks'
- name: 'Task3'
import: 'myproject.tasks'
- name: 'Task4'
import: 'myproject.tasks'
flows:
- 'flow1'
flow-definitions:
- name: 'flow1'
queue: 'flow1_queue'
edges:
- from:
to: 'Task1'
- from:
to: 'Task2'
- from: 'Task1'
to: 'Task3'
- from:
- 'Task2'
- 'Task3'
to: 'Task4'
Now let’s run this flow with arguments {"foo": "bar"}
:
from selinon import run_flow
run_flow('flow1', node_args={"foo": "bar"})
What Selinon will do for you as the very first step, it will schedule task Dispatcher
- there will be sent appropriate message to queue flow1_queue
(see queue
configuration option for flow). Once one of your workers pick the message there will be run Dispatcher
task.
The aim of dispatcher task, as written earlier, is to handle flow actions. Based on configuration you stated, flow1 should start with two tasks - Task1 and Task2. As these tasks do not have time nor data dependencies on each other (or their transitive predecessors), they can run in parallel. Before the execution is scheduled, dispatcher checks conditions. In this case both conditions are always true, so dispatcher publishes two messages to default queue (no queue configuration was provided for Task1 and Task2) to execute tasks Task1 and Task2. After that dispatcher is done with work for this run and retries - schedules dispatcher task again, now with different arguments that keep track of scheduled tasks. If you are familiar with Celery, dispatcher issues self.retry()
with different arguments (so you can be sure that the task id for dispatcher does not change).
Note
Conditions are evaluated only once and they are evaluated for every combination of tasks that can fire the edge with the given condition.
Now lets say that a worker picked Task2 first and it was successfully computed.
Let’s assume that dispatcher was scheduled for this flow again. Dispatcher checks status of tasks that were scheduled. Dispatcher finds out that Task1 haven’t finished yet (it can be running on different cluster node or it is still queued) nevertheless dispatcher finds out that task Task2 was completed. The next edge to be processed is edge leading to Task4, but to fire this edge we need to be sure that Task3 was successful. This is not true as Task3 was not run so dispatcher cannot schedule any other task now. Dispacher retries (schedules itself to the future) with different arguments, that also carry information about the fact that task Task2 was successful.
Note
Dispatcher is scheduled in the queue based on your queue
flow configuration in your YAML configuration file. This also applies for retries - dispatcher is rescheduled to the same queue.
Meantime another worker picked message to compute Task1 and this task was successfully computed.
In the next dispatcher run, dispatcher finds out that Task1 was successful so dispatcher can proceed to scheduling Task3. Task3 is scheduled (a message is sent to the default queue) and that’s all what dispatcher can do for this run. It retries marking Task3 as scheduled and tasks Task1, Task2 as successful (in arguments for the next dispatcher run).
Dispatcher cannot do any other actions unless Task3 is run, so let’s say that Task3 was successfully computed.
In the next run we can take into account facts that both Task2 and Task3 finished so dispatcher can schedule Task4 and retry. Dispatcher, again, states active tasks (or active sub-flows if we have any) in arguments for the next dispatcher run and retries after some time.
Once task Task4 would be successfully computed, we can successfully mark the current flow as successful as there are no remaining tasks that could be scheduled. Dispatcher thus ends (does not issue retry) and stores information about finished (or failed) nodes to the result backend (this is done for cases where there are parent flows that need this information or for you to keep track about flows status).
It is very important to note that there is one dispatcher per flow. So if you schedule two flows in parallel, there will be scheduled two dispatcher tasks, each handling one flow.
Sub-flows¶
As dispatcher is a task as any other, sub-flows are handled by scheduling dispatcher task that handles given sub-flow. So if you have two flows flow1 and flow2, and you run flow2 as a sub-flow in flow1 flow, dispatcher in flow1 will schedule dispatcher task handling flow2 as a task that is a node in the task dependency graph you provided in the YAML configuration file.
Each dispatcher reports information about finished and failed nodes as a JSON that is stored in the result backend. There are tracked only nodes that are run in the given flow. Any sub-flow related information needs to be computed when requested (such as propagate_finished
, see YAML configuration).
Failure handling¶
Failure of a task means that the task raised an exception. Currently, there is done quiet straightforward failure handling. If a task fails, dispatcher will keep the failure in dispatcher arguments for the next run. Once there is nothing to proceed with and there are no active nodes, dispatcher will try to recover from flow failure by scheduling fallback tasks or fallback flows (fallback flow nodes). If the fallback was successfully run and it means that dispatcher recovered from flow failure, dispatcher can continue with scheduling new tasks or sub-flows as there would be no failure.
If dispatcher cannot recover from failures in the flow, there is propagated flow failure to parent flows (if any) and the current flow is marked as failed.
Note
Note that dispatcher can keep scheduling new tasks even if some tasks failed. This prevents from time-dependent flow evaluation.
Command line interface¶
Selinon is shipped with a CLI (Command Line Interface) that offers you interaction with Selinon code and perform various actions like plotting flow graphs, executing a flow via a command line executor or inspect provided configuration via sub-commands.
Most of the sub-commands accept configuration files you use with Selinon so to use Selinon CLI you need to pass nodes.yaml
file as well as all your flow configuration files (if you split configuration files into multiple YAML files).
Usage: selinon-cli [OPTIONS] COMMAND [ARGS]...
Selinon command line interface.
Options:
-v, --verbose Be verbose about what's going on (can be supplied multiple
times).
--version Print Selinon version and exit.
--no-color Suppress colorized logging output.
--help Show this message and exit.
Commands:
execute Execute flows based on YAML configuration in a CLI.
inspect Inspect Selinon configuration.
migrate Perform migrations on old and new YAML configuration files in flow changes.
plot Plot graphs of flows based on YAML configuration.
version Get version information.
Note
If you do queue name expansion based on environment variables, you need to explicitly make sure the environment variable is present. Otherwise parsing configuration files will fail.
Inspecting configuration¶
Selinon CLI offers you the inspect
sub-command. This sub-command parses all the config files and outputs requested results or just checks your configuration consistency.
You can use the inspect
sub-command for example to list all queues stated in your YAML files or do other stuff for querying parsed and checked configuration files.
One of the interesting options worth to state is the --dump
command. As Selinon uses YAML config files that carry some logic, there is needed interpretation of some parts. Thus there is generated Python code from YAML configuration files that is afterwards supplied to Selinon itself to orchestrate flows. You can request the generated Python code with the --dump
command if you are interesting in it or you just want to explore Selinon internals.
Plotting flow graphs¶
As flows and dependencies between tasks might get pretty complex, Selinon offers you a way to visualize flows. For this purpose the plot
sub-command is available in Selinon CLI. It can plot flow graphs for you in any format that is supported by graphviz.
All the flow graphs available in this documentation were plotted using the plot
sub-command.
You can also adjust style of the resulting image by supplying a YAML-based configuration that states style information. As Selinon uses graphviz under the hood to plot flow graphs, all options that are supported by graphviz are applicable to nodes, edges, storages and arrows. In the example bellow, you can see how to adjust style configuration of different components that occur in the resulting flow plot (this is default configuration).
# A task node.
task:
style: filled
color: black
fillcolor: '#66cfa7'
shape: ellipse
# A flow node (a sub-flow in a flow).
flow:
style: filled
color: black
fillcolor: '#197a9f'
shape: box3d
# A condition on a simple edge.
condition:
style: filled
color: gray
fillcolor: '#e8e3c8'
shape: octagon
# A condition on foreach edge.
condition_foreach:
style: filled
color: gray
fillcolor: '#e8e3c8'
shape: doubleoctagon
# A storage node.
storage:
style: filled
color: black
fillcolor: '#894830'
shape: cylinder
# A simple edge.
edge:
arrowtype: open
color: black
# An edge from a task to storage that was assigned to the task.
store_edge:
arrowtype: open
color: '#894830'
style: dashed
# An edge that leads to a fallback node.
fallback_edge:
arrowtype: open
color': '#cc1010'
# A special mark signalizing to always recover from a failure (fallback set to true).
fallback_true:
style: filled
color: black
fillcolor: '#5af47b'
shape: plain
You can find more configuration options in the graphviz library documentation.
Simulating flow execution¶
To debug, explore, play or interact with task flow execution anyhow, Selinon CLI offers you a built in executor. This executor tries to simulate message queueing and message consuming so no broker (and Celery’s result backend) is involved.
Note
Note that the execution can vary from real broker interaction as there are involved other parameters as well (e.g. prefetch multiplier configuration, concurrent broker message publishing, etc.).
Executor currently supports only single-process, single threaded executor - one worker serving tasks. Worker accepts messages in a round-robin fashion based on message availability in queues.
In order to see what is happening during executor run, you can run executor in a verbose mode. Executor in that case prints all the execution actions. It can help you when you want to experiment with your flow configuration or you would like to debug strange flow behaviour.
Generating migrations of configuration files¶
As Selinon offers you a mechanism to do changes in your configuration files and do re-deployment of workers, there needs to be a mechanism that ensures changes done in your configuration files are reflected to already present messages on queue. This lead to migrations design.
You can generate migration files using the migrate
sub-command. Please take a look to the section that explains migrations in more detail.
Using environment variables to supply options¶
If you run Selinon CLI in various scripts or you would like to interact with Selinon CLI in different environments, you can explicitly state your options in environment variables:
export SELINON_NODES_DEFINITION=/path/to/nodes.yaml
export SELINON_FLOW_DEFINITIONS=/path/to/flows/
export SELINON_EXECUTE_NODE_ARGS_JSON=1
# No need to explicitly state YAML configuration files and option to do JSON parsing.
$ selinon-cli execute --flow-name flow1 --node-args '{"foo": "bar"}
export SELINON_NODES_DEFINITION=/path/to/nodes.yaml
export SELINON_FLOW_DEFINITIONS=/path/to/flows/
$ selinon-cli inspect --list-task-queues # No need to supply --nodes-definition and --flow-definitions explicitly
The schema for constructing environment variables is SELINON_<SUBCOMMAND>_<OPTION>
where <SUBCOMMAND>
is Selinon’s CLI sub-command in uppercase and <OPTION>
is requested option (converted to uppercase, dashes converted to underscores). The only exception are --nodes-definition
and --flow-definitions
where <SUBCOMMAND>
is omitted.
Migrations - Redeployment with changes¶
In this section you can find all the details on how to do changes in the configuration files, do redeployment of workers with new configuration files in your cluster without a need of purging queues.
Why are migrations needed¶
If you inspect the message payload that is sent to message broker by dispatcher task (see Deep diving into Selinon internals section for more info), you can find that there is captured the current flow status (e.g. active edges in the flow graph, finished nodes, active nodes, etc.). Actually this needs to be captured somewhere as dispatcher needs to know which tasks should be and which shouldn’t be scheduled during the dispatcher run (at least during flow execution). The way how dispatcher is designed (sampling by retrying) makes ideal place for storing such information the dispatcher message as this message is adjusted each time dispatcher is rescheduled (according to the current flow state). After the flow is finished, this information is no longer needed and thus it’s safe to drop it (message is acked and automatically removed from queue).
Imagine you have a flow. You configured workers, did deployment to production, but after some time you realized that some tasks should be removed or added to your flow or you would like to change how the flow is structured. As you already run in production, you don’t want to purge queues and drop all the messages that were already published on the message broker. As messages that are available on message broker carry information (state) of the flow, they refer to the configuration that was used when the flow started. If you would like to change flow behaviour, there are 2 main options how to do that.
Using versioned flows¶
One of the possibilities here is to introduce versioned flows - you just distinguish flows based on name - e.g. flow1_v0
would be your old flow and flow1_v1
would be the updated flow. By keeping the old configuration in the worker you can easily let the old flow1_v0
to finish (all the messages on the broker get consumed) and after that drop it completely. Meanwhile you will not schedule flow1_v0
after redeployment, but rather the new flow version named flow1_v1
.
This approach is applicable if you are fine with letting the published messages to be processed the old way based on flow1_v0
configuration.
Note
If you distinguish flows also on queue names, you can monitor number of messages that are about to be processed using old flow_v0
configuration.
Migration of already published messages¶
Another approach is to change flow behaviour on runtime - that means you have messages queued, but you would like to apply changes in the configuration file after redeployment also on queued messages.
Danger
Do not perform flow changes in the YAML configuration file without generating migrations. It will mostly resolve in fatal and unrecoverable failures in dispatcher task causing it to fail or infinite rescheduling. In general, changes in the flow structure without performing migrations lead to undefined behaviour.
For this reason migration of messages comes in handy. Selinon CLI offers you a way how to generate a migration that affects flow execution and lets you decide what to do in case your change affects flow execution based on the current state - tainted flows.
Note
Selinon does not fire added edges to the configuration file that would preceed the current flow state - in other words, dispatcher does not schedule nodes traversing the flow graph backwards. However it works as expected in case of cyclic flows.
Tainted flows¶
If your change in the configuration file affects flow behaviour, dispatcher marks the affected flow as tainted. A tainted flow is flow that would behave differently if it would be executed with the new configuration, but it has already started and the current state resolves in different flow execution compared to the new configuration file.
Whether a flow is tainted highly depends on state of the flow in which the migration is done and the migration itself.
There are two types of events that lead to tainted flows:
- tainted edges - flows tainted based on edges that were fired but they do not exist in the new configuration
- tainting nodes - flows tainted based on nodes that weren’t executed as the state of the flow outrun added edges
Tainted edges¶
Tainted edges are edges that were present in the old configuration but are not present in the new one, or destination nodes of edge were modified. If the state of flow in the deployed system already fired such edge (destination nodes were run as edge condition was evaluated as true), the flow is marked as tainted and requested tainted flow strategy will be triggered based on the migration configuration (see bellow).

Let’s say you have a flow as shown on the figure above. In your configuration file you remove the edge that leads from Task1 to Task3 so the flow structure will change as visualized on the figure bellow. You generate a migration and do the redeployment, but you have already some flows in progress. Let’s say that worker picks dispatcher task, the flow already run Task1 and fired both edges leading to Task2 and Task3. In this case the flow progress basically waits for these tasks to finish based on the old configuration. However with migration that you did, there is no edge to Task3 - as this edge was already fired, it tainted the flow. In this case dispatcher will report tainted flow and will behave according to your tainted flow strategy (see Tainted flow strategy bellow).

In another case, a worker would pick message respecting old configuration in which Task1 was scheduled, but dispatcher in the previous run did not find that Task1 finished (edges to Task2 and Task3 were not fired). In this case the flow is not marked as tainted and dispatcher continues checking state of nodes respecting the new configuration with migration.
Tainting nodes¶
Tainting nodes are nodes that were run based on the current state of flow, but there was added an edge in migration that would be evaluated (as source/tainting nodes already run) if all of the tainting nodes would get finished.

Let’s illustrate this scenario as well. In this case you have a flow that runs two tasks - Task1 and Task2 after the Task1 finishes. You deployed this version of configuration, but after a while you decided to add a new task called Task3 that is scheduled after Task1 successfully finishes. This scenario is visualized on the figure bellow. You create a migration file do the redeployment with new configuration file and generated migration. Some flows were already in progress - let’s say that worker picked dispatcher task and the state of the flow indicates that Task1 was already run, the edge to Task2 was already triggered (dispatcher waits for Task2 to finish), but with new configuration and migration, there would be evaluated the edge to Task3 as well (this was not done with the old configuration). Dispatcher does not schedule Task3 as this edge would be fired after Task1 finishes (the event noticing that Task1 finished was already tracked), but rather marks flow as tainted. Once the flow is marked as tainted, the tainted flow strategy (see Tainted flow strategy bellow) is evaluated.

Note
Conditions are not evaluated in case of tainting nodes. Tainted flow indicates that the edge would be evaluated and possibly fired based on result of condition result.
Tainted flow strategy¶
When a flow is marked as failed, you can configure how dispatcher should behave in such situations. There are three types of tainted flow strategies:
IGNORE
(default) - migrations marked asIGNORE
do not perform any actions on tainted flows, a tainted flow is reported using the tracing mechanism, but the execution continuesRETRY
- this strategy causes that the whole flow is retried - all results computed until migration will not be reused in the retried flowFAIL
- flow will be immediately marked as failed - previously scheduled tasks will be run (as they are already scheduled) ignoring state
See Trace flow actions section for information about captured details. If there is performed migration chaining (see bellow), state is distinguished based on tainted flow strategy with the highest impact:
- if at least one flow migration has strategy
FAIL
- the tainted flow will fail- if at least one flow migration has strategy
RETRY
but noFAIL
, the tainted flow will be marked asRETRY
- if all flow migrations have strategy
IGNORE
and there is not presentRETRY
norFAIL
strategy, tainted flows will be ignored but reported in the tracing mechanism
Migration chaining¶
Migrations can be chained - they are incrementally numbered starting with 1. A special migration version 0 means that no migrations are present - Selinon was not set up to use migrations and no migrations are present. If you would like to do migrations, add anytime (even when you deployed worker with no migrations present) migration_dir
to the global configuration section in the nodes.yaml
file (see YAML configuration section for more details). These migrations will be run afterwards once worker detects migrations to be present.
global:
# The migration dir can be an absolute or relative path to the process working directory.
# Preferably state absolute path.
migration_dir: 'path/to/dir/containing/migrations'
Note that migration version is not per flow specific, but rather deployment specific. This means that you can see even in messages incrementally changing migration version for flows that do not have any affecting migrations.
If you would like to perform more changes that should trigger different migration strategies, it’s perfectly okay to generate multiple migrations and apply them with different tainted flow strategy based on the flow state in your deployment. Migrations get applied based on migration versions (incrementally) as you would expect respecting tainted flow strategy.
Danger
Make sure you do not generate the same migration multiple times. That would resolve in undefined behaviour.
Generating migrations¶
Migrations can be generated using the Selinon CLI. It operates either on explicitly supplied old and new configuration files on which the migration is computed or you can use the built in support for Git VCS. Selinon in that case checks Git history for older versions of configuration files supplied.
$ # Add --verbose parameters for noisy output, use git to retrieve old configuration files. Path to migration_dir will be taken from nodes.yaml.
$ selinon-cli -vvv migrate --flow-definitions myapp/flows/*.yaml --nodes-definition myapp/nodes.yaml --git
You can also explicitly adjust migration directory. Implicitly Selinon will take into account migration_dir
stated in the global section of nodes.yaml
file. Also make sure you are pointing to the right directory as migrations files are numbered incrementally so Selinon needs to know the previous migration version that is distinguished based on presence of migration files in the migration_dir
directory.
$ # Explicitly point to old configuration files and the migration directory, set tainted flow strategy as FAILED.
$ selinon-cli -vvv migrate --flow-definitions myapp/flows/*.yaml --nodes-definition myapp/nodes.yaml --old-flow-definitions myapp/old_1/flows/*.yaml --nodes-definition myapp/old_1/nodes.yaml --migration-dir myapp/migration_dir/ --tainted-flow-strategy FAIL
Generated migrations are JSON files with computed tainting nodes, tainted edges and edge translation (renumbering and edge dropping if needed). These JSON files also carry some metadata such as where, when and by who the given migration was created. You can suppress capturing metadata by generating migrations with the --no-meta
option.
Note
Conditions do not change graph structure - changes in conditions do not imply in creating new migration files. Conditions affect only graph execution path.
Migrations and fallbacks¶
Migrations do not apply to fallback nodes and their dependencies. Fallback edges represent exceptional path that is triggered if there was a failure. You can safely do redeployment without a need of generating migrations for changes in fallbacks. The behaviour of fallback evaluation will respect fallback configuration based on the current configuration available in deployment.
Note
Fallback edges are evaluated at the end of flow in case of failures. The aim of fallback edges is to recover from flow failures and thus represent exceptional paths - they can however lead to continuing flow execution in the main path (not the exceptional) based on finished tasks in fallbacks.
Using and tracing migrations¶
Once you create migrations, make sure your migrations are present in the deployed worker so Selinon can transparently apply generated migrations. Without migration files present, Selinon is unable to perform migrations - that could lead to undefined behaviour when old messages would be processed with newer and modified configuration.
Take a look at the Trace flow actions section that will point you to tracing events
that are emitted on migrations. They also carry some information that could help you with debugging and understanding what’s going on in your system.
Migration skew¶
There is a special error that is reported in case of inconsistency in migration files and the migration version carried within the message for dispatcher task - MigrationSkew
. Dispatcher issues retry (without affecting the flow retry
configuration option) that leads to republishing the message. This error indicates that the worker was unable to process the given message as migration version in the received message is newer than available migrations on the worker. In other words, worker is unable to correctly process the message.
This error can indicate that you have workers available with different migration versions available. If you do rolling updates where your replica count of old workers decrements while new workers are spawned, the MigrationSkew
error can be seen. New workers start to publish messages that are not suitable for old workers, but some of the old workers that are still alive tried to process newly published messages. As you would expect, by republishing messages, old workers give up with processing new message types and leave the processing for a new worker with the new migration present.
Flow configuration for examples¶
The tainted flow figures were created using the following configuration.
tasks:
- name: 'Task1'
import: 'myapp.tasks'
- name: 'Task2'
import: 'myapp.tasks'
- name: 'Task3'
import: 'myapp.tasks'
flows:
- 'flow1_tainted'
- 'flow2_tainted'
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
- from: 'Task1'
to: 'Task3'
- name: 'flow2'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2
Optimization¶
Selinon offers you a highly scalable solution. By design, you can find two optimization techniques interesting. Both are discussed in the following sections.
Flow optimization & Dispatcher scheduling¶
In order to optimize your flow execution, you need to deeply understand core concept behind Selinon. As you already read, the key idea behind Selinon is dispatcher (see Dispacher
). Dispatcher is periodically scheduled and checks state of tasks in the flow and schedules new if necessary.
As there is non-zero overhead for each dispatcher run - queue message, receive message, check task status (querying result backend) and queue message for dispatcher again, it is generally a good idea to optimize dispatcher scheduling.
Selinon offers you to optimize dispatcher scheduling by configuring sampling strategies.
A sampling strategy basically gives information on when the dispatcher should be scheduled (or to be more precise retried) in order to check the current flow status.
By default dispatcher is rescheduled every 2 seconds. Note that it means “give dispatcher at least 2 seconds to retry”. If you have full queues (and busy workers) your dispatcher can be rescheduled even after days.
Selinon offers you couple of predefined sampling strategies (refer to selinon.strategies
for more info):
flow-definitions:
- name: 'flow1'
# define sampling strategy
sampling:
name: 'constant'
args:
# check for flow state at least each 10 seconds
retry: 10
edges:
# a list of edges follows
You can also provide your own sampling strategy. Basically sampling strategy is given by a number which tells Selinon when to reschedule dispatcher in the flow. This number is computed in the sampling strategy function which accepts one positional argument status and keywords arguments that are passed to sampling strategy function based on the YAML configuration you provided.
Now let’s assume that we want to optimize scheduling dispatcher. We have the following flow definition:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
Here is corresponding flow visualization:

Based on statistics we have we know that the execution time of task Task1 is 10 seconds in average and the execution time for task Task2 is 15 seconds in average. You can easily write your sampling strategy function:
import random
def my_sampling_strategy(status, randomness):
if not status['active_nodes']:
return None
if 'Task1' in status['new_started_nodes']:
return 10 * random.uniform(-randomness, randomness)
if 'Task2' in status['new_started_nodes']:
return 15 * random.uniform(-randomness, randomness)
return max(status['previous_retry'] / 2, 2)
Note
This example is oversimplified. You would probably want to get more information such as what distribution task execution time has based on flow arguments and what are other parameters that affect task time execution. The argument randomness
is used for demonstrating arguments propagation.
Now you can plug and use your sampling strategy function in your YAML configuration file:
flow-definitions:
- name: 'flow1'
sampling:
# from myapp.sampling import my_sampling_strategy
name: 'my_sampling_strategy'
import: 'myapp.sampling'
args:
randomness: 3
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
Now your sampling strategy function will be called each time dispatcher will want to reschedule. If None
is returned, dispatcher should end flow immediately. Otherwise a positive integer has to be returned that represents number of seconds for retry.
Danger
As the sampling strategy function is executed by dispatcher it can not raise any exception! If an exception is raised, the behaviour is undefined.
Dedicating a separate worker for dispatcher - Cluster segmenation¶
Usually, it is a good idea to introduce specific workers for dispatcher task. Usually it resolves in better performance as workers that do actual computational tasks do not necessarily need to check flow status (especially with prefetching taken into account). This means that you can introduce 1-2 workers (based on number of messages that are fed into queues, you can also apply auto-scaling) that listen only on queues dedicated to dispatcher so these workers actually schedule the workload to specific workers based on queue configuration. To find a right balance in number of workers for specific queues can take some time, but the performance impact usually counts.
Storage optimization & Distributed caches¶
By using Selinon you can reach to two main issues with your cluster on heavy load:
- Your cluster is not powerful enough to serve requested number of tasks.
- Your storage/database cannot process requested numbers of requests or your network is not capable to transmit such number of queries.
In the first case the solution is simple: buy/use more hardware.
In the latter one there are two main approaches how to tackle such bottleneck. You can always use more storage replicas or split data accross multiple storages and transparently configure Selinon to use different storages for different purposes (see storages aliasing in Best practices).
If the above solution is not suitable for you or you want to optimize even more, Selinon offers you an optimization that introduces distributed caches. These caches are distributed across nodes (workers) in your cluster and act like a caching mechanism to reduce number of requests to storages/databases and keep data more close to execution nodes.
Selinon by default uses cache of size 0 (no items are added to the cache). There are prepared in-memory caches like FIFO (First-In-First-Out cache), LIFO (Last-In-First-Out cache), LRU (Least-Recently-Used cache), MRU (Most-Recently-Used cache), RR (Random-Replacement cache). See selinon.caches
for more info.
Note
You can simply use for example Redis for caching. Just deploy Redis in the same pod as your worker and point caching mechanism to Redis adapter in your YAML configuration adapter. This way you will reduce number of requests to database as results get cached in Redis (available in the same pod) once available.
Caching task results¶
Results of your tasks can get cached. This is especially useful when you use predicates that query storage/database often. To define a cache just provide configuration in your YAML configuration as shown bellow:
tasks:
- name: 'Task1'
import: 'myapp.tasks'
cache:
# from myapp.cache import RedisCache
name: 'RedisCache'
import: 'myapp.cache'
configuration:
host: 'redis'
port: 6379
db: 0
password: 'secretpassword'
charset: 'utf-8'
host: 'mongo'
port: 27017
Results are added to the cache only if dispatcher requests results from cache for predicates. Any error with user’s cache is reported using the tracing mechanism (see the Trace module
). In case of cache failures, Selinon will continue using results directly from storage as cache failures are not fatal.
Note
Caching task results could be beneficial if you have a lot of conditions that depend on some task results. They could be even more beneficial if you do flow or task throttling with conditions (see Best practices for more info).
Caching task states¶
You can also introduce caching mechanism for task states. Note that task states are handled by Celery (refer to Celery’s AsyncResult
for more details). Selinon offers you a way on how to place a cache as an intermediate:
flow-definitions:
- name: 'flow1'
cache:
# from myapp.cache import RedisCache
name: 'RedisCache'
import: 'myapp.cache'
configuration:
host: 'redis-cache'
port: 6379
db: 0
password: 'secretpassword'
charset: 'utf-8'
host: 'mongohost'
port: 27017
edges:
- from:
to: 'Task1'
As you can see, caches are per-flow specific and configurable. This way you can easily use caches only for flows that you consider critical for caching mechanism.
The RedisCache
implementation has to derive from Cache
as well and implement required methods. Note that the configuration is passed to cache constructor similarly as in DataStorage
case - as keyword arguments (see Storage adapter implementation).
Note
Caching task states is generally a good idea if you depend on many task states in your flow edges (a lot of source tasks in edges) and these tasks have various execution time (very “width” flows).
Note
Due to results consistency information about task states are added to caches only if task (or flow) fails or finishes - there won’t be any flow or task with the same id executed in the future.
As in case of task result caches, if there is some issue with a cache, these errors are reported but they do not have fatal effect on the flow. If there is something wrong, Selinon will just use directly result backend.
Prioritization of tasks and flows¶
You can easily prioritize tasks or flows by placing them on separate queues and dedicate more workers to process these tasks or flows (see queue
option in the YAML configuration section). Note that queues that are reserved for flows are used for dispatcher task that does scheduling based on state (not for tasks in general that are stated in the flow).
Note
Think about prioritization before you do production deployment - place each flow and task on a separate queue and you can easily prioritize flows and tasks without purging queues or performing redeployment.
To run Celery worker only the given queues queue1
and queue2
specify Celery’s -Q
argument:
celery worker -A myapp.start -Q queue1,queue2 # additional parameters follow
Throttling of tasks and flows¶
Throttling can be performed by reducing number of workers that process given task or flow as opposite to described prioritization (see above).
If you would like to perform even more robust throttling, you can apply throttling option on task or flow level (see throttling
option in the YAML configuration section). Note that for setting up throttling properly you need to dedicate one cluster node that would accept dispatcher task (flow on a separate queue) in the given flow with throttled tasks or flows. This guarantees that scheduling of throttled tasks or flows is done on one place so the given worker node can perform time-based throttling properly based on internally kept state.
Flow throttling is done on flow level (not on task level) similarly as flow prioritization described above.
Note
Place each task and flow (dispatcher scheduling) on a separate queue so you can easily do throttling without purging queues when changing prioritization or throttling related configuration.
An example of configuration:
---
tasks:
- name: Task1
import: myapp.tasks
queue: task1_v0
throttling: # task level throttling
seconds: 10
# additional entries follow
flow-definitions:
- name: flow1
queue: flow1_v0
throttling: # flow level throttling
minutes: 30
seconds: 10
edges:
# additional entries follow
Other optimizations¶
If you would like to optimize performance take a look at Celery’s user guide for optimizing Celery execution.
For example configuring prefetch multiplier (number of messages that are prefetched in a bulk mode from broker to worker) can result in significant speed up (more than 10 times based on my experience). In order to find the right and balanced solution for your cluster, you will need to experiment a bit.
Moving from raw Celery to Selinon¶
Supposing you are already using Celery and you want to migrate to Selinon, this section will give you a very high overview on what needs to be done and what are possible pitfalls when migrating. Also check Frequently Asked Questions (FAQ) for some notes on why you should and why you shouldn’t migrate to Selinon.
- Make sure you use Celery of version 4. Older versions of Celery are not supported by Selinon.
- Make sure your result backend is some persistent storage (such as Redis or PostgreSQL). If you use rpc, Selinon will not work properly.
- If you use Celery’s tasks, the transition will be more smooth (just change base class to
SelinonTask
). If you are using Celery functions you need to encapsulate them to the mentioned class. - If you are using Celery primitives, remove them. Instead define Selinon YAML configuration.
- If you are using features like
self.retry()
or you raise Celery specific exceptions, rewrite semantics to YAML configuration and remove these Celery-specific pieces. Also note that changing arguments for a task byself.retry()
is not supported by Selinon. - Pass application context and configuration files to Selinon
Config
- Do not adjust Celery’s task status (like revoked tasks), they will not work with Selinon
- Feel free to extend this list…
Development and Contributing¶
If you use Selinon and you spot some weird behaviour, feel free to open an issue .Feel free to dive into source code and submit a pull request to make Selinon a better and more stable project.
Preparing development environment¶
To prepare your environment make sure that you have all development requirements installed. Check shipped Makefile
for prepared commands that can be directly issued.
If you would like to create a virtual environment not to install (possibly) unwanted requirements on your system run:
make venv
To enter the prepared virtual environment run:
source venv/bin/activate
Now make sure that you have installed all development requirements:
make devenv
Now you are ready to hack! B-)
Tests¶
Selinon come with test suites. If you make changes, make sure that the test suite passes:
make check
The command above will run test suite and report any unexpected behaviour. It will also run linters and some code-quality related tools to ensure your changes look good.
If you make any changes, make sure that the test suite passes before your pull request. If you would like to test changes in some specific situations, Selinon demo could be a great starter to point to some specific use cases.
And not to forget… If you make any improvements in the source code, feel free to add your name to CONTRIBUTORS.txt
file.
Selinon is a tool that gives you a power to define flows, sub-flows of tasks that should be executed in Celery - a distributed task queue. If you want to define very simple flows, Celery offers you workflow primitives that can be used. Unfortunately these workflow primitives are very limited when it comes to extending your workflow, working with multiple dependencies or scheduling tasks based on particular conditions or events.
If you are looking for a workflow solution, take a look at existing solutions, such as Azkaban, Apache Airflow, Apache Nifi, Spotify Luigi and others. The main advantage of using Selinon over these are facts, that you can use it in fully distributed systems and for example let Kubernetes or OpenShift do the workload (and much more, such as recursive flows, sub-flows support, selective task runs, …), define task flows that are not strictly DAG (not strictly directed acyclic graph, but directed graph instead), let system compute which tasks should be run in dependency graph and other features. This all can be accomplished by providing simple and yet intuitive YAML-based configuration files.
Even if you do not plan to run Selinon in a distributed environment, you could still find Selinon useful. Selinon offers you a CLI executor that, based on your YAML configuration, executes tasks based on their dependencies and conditions.

Is there available a demo?¶
You can take a look at Selinon demo so you can see how Selinon works without deep diving into code and configuration. Just run:
docker-compose up
What do I need?¶
In order to use Selinon, you need:
- a basic knowledge of Python3 (yes, there is no support for Python2!)
- YAML markup language (very intuitive and easy to learn)
- a basic knowledge of Celery 4 (older versions are not supported) and its configuration
- a message broker that is supported by Celery if you plan to run Selinon in a distributed environment
- vegetable appetite ;-)
How does it work? - a high level overview¶
The key idea lies in Dispatcher - there is created a Dispatcher
task for each flow. Dispatcher takes care of starting new tasks and sub-flows, checking their results and scheduling new tasks based on your configuration. Dispatcher is transparently scheduled and periodically samples flow status and makes decissions what tasks and flows should be scheduled.
The only thing that needs to be provided by you is a YAML configuration file that specifies dependencies of your tasks and where results of tasks should be stored (if you want persistence). This configuration file is parsed by Selinon and automatically transformed to a Python3 code which is then used by Selinon.
See also¶
- Selinon on GitHub
- Selinon organization with various repos on GitHub
- Selinon demo on GitHub
- Celery configuration
Indices and tables¶
Documentation was automatically generated on 2023-01-27 at 16:08.