selinon.executor.executor module

Simulate execution in a single CLI run.

Using Selinon executor is a good way to test you configuration and behaviour locally if you would like to save some time when debugging or exploring all the possibilities that Selinon offers you. Keep in mind that running Selinon locally was designed for development purposes and behaviour in general can (and in many cases will) vary.

The key idea behind executor is to simulate exchange of messages which is done by your Celery broker and Kombu under the hood of Celery. Thus there are lazily created queues that are referenced by their names (see selinon.executor.time_queue for implementation details). These queues hold messages prior to time on which they were scheduled. Under the hood there is used a heap queue to optimize inserting to O(log(N)) in the worst case where N is number messages currently in the queue. These queues are coupled into QueuePool (selinon.execute.queue_pool) which encapsulates all queues, keeps their references, instantiates it lazily and provides concurrency safety.

In order to avoid starving, QueuePool keeps track of the queue (“a last used queue”) which prevents from starving messages that were scheduled for the same time (basically a simple round-robin). QueuePool looks for a message that is not scheduled to the future and can be executed (to do so we get O(N) where N is number of queues being used).

All workers listen on all queues for now. This prevents from waiting on a message that would be never processed.

In order to understand how Executor works, you need to understand how Celery works. Please refer to Celery documentation if you are a Celery-newbie.

class selinon.executor.executor.Executor(nodes_definition, flow_definitions, concurrency=1, sleep_time=1, config_py=None, keep_config_py=False, show_progressbar=True)[source]

Bases: object

Executor that executes Selinon run in a multi-process environment.

DEFAULT_CONCURRENCY = 1
DEFAULT_SLEEP_TIME = 1
executor_queues = QueuePool({})
run(flow_name, node_args=None)[source]

Run executor.

Parameters:
  • flow_name – a flow name that should be run
  • node_args – arguments for the flow
run_flow_selective(flow_name, task_names, node_args=None, follow_subflows=False, run_subsequent=False)[source]

Run only desired tasks in a flow.

Parameters:
  • flow_name – name of the flow that should be run
  • task_names – name of the tasks that should be run
  • node_args – arguments that should be supplied to flow
  • follow_subflows – if True, subflows will be followed and checked for nodes to be run
  • run_subsequent – trigger run of all tasks that depend on the desired task
Returns:

dispatcher id that is scheduled to run desired selective task flow

Raises:

selinon.errors.SelectiveNoPathError – there was no way found to the desired task in the flow

classmethod schedule(task, celery_kwargs)[source]

Schedule a new task to be executed.

Parameters:
  • task (Dispatcher|SelinonTaskEnvelope) – task to be executed
  • celery_kwargs – arguments for the task - raw Celery arguments which also carry additional Selinon arguments