Optimization¶
Selinon offers you a highly scalable solution. By design, you can find two optimization techniques interesting. Both are discussed in the following sections.
Flow optimization & Dispatcher scheduling¶
In order to optimize your flow execution, you need to deeply understand core concept behind Selinon. As you already read, the key idea behind Selinon is dispatcher (see Dispacher
). Dispatcher is periodically scheduled and checks state of tasks in the flow and schedules new if necessary.
As there is non-zero overhead for each dispatcher run - queue message, receive message, check task status (querying result backend) and queue message for dispatcher again, it is generally a good idea to optimize dispatcher scheduling.
Selinon offers you to optimize dispatcher scheduling by configuring sampling strategies.
A sampling strategy basically gives information on when the dispatcher should be scheduled (or to be more precise retried) in order to check the current flow status.
By default dispatcher is rescheduled every 2 seconds. Note that it means “give dispatcher at least 2 seconds to retry”. If you have full queues (and busy workers) your dispatcher can be rescheduled even after days.
Selinon offers you couple of predefined sampling strategies (refer to selinon.strategies
for more info):
flow-definitions:
- name: 'flow1'
# define sampling strategy
sampling:
name: 'constant'
args:
# check for flow state at least each 10 seconds
retry: 10
edges:
# a list of edges follows
You can also provide your own sampling strategy. Basically sampling strategy is given by a number which tells Selinon when to reschedule dispatcher in the flow. This number is computed in the sampling strategy function which accepts one positional argument status and keywords arguments that are passed to sampling strategy function based on the YAML configuration you provided.
Now let’s assume that we want to optimize scheduling dispatcher. We have the following flow definition:
flow-definitions:
- name: 'flow1'
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
Here is corresponding flow visualization:
Based on statistics we have we know that the execution time of task Task1 is 10 seconds in average and the execution time for task Task2 is 15 seconds in average. You can easily write your sampling strategy function:
import random
def my_sampling_strategy(status, randomness):
if not status['active_nodes']:
return None
if 'Task1' in status['new_started_nodes']:
return 10 * random.uniform(-randomness, randomness)
if 'Task2' in status['new_started_nodes']:
return 15 * random.uniform(-randomness, randomness)
return max(status['previous_retry'] / 2, 2)
Note
This example is oversimplified. You would probably want to get more information such as what distribution task execution time has based on flow arguments and what are other parameters that affect task time execution. The argument randomness
is used for demonstrating arguments propagation.
Now you can plug and use your sampling strategy function in your YAML configuration file:
flow-definitions:
- name: 'flow1'
sampling:
# from myapp.sampling import my_sampling_strategy
name: 'my_sampling_strategy'
import: 'myapp.sampling'
args:
randomness: 3
edges:
- from:
to: 'Task1'
- from: 'Task1'
to: 'Task2'
Now your sampling strategy function will be called each time dispatcher will want to reschedule. If None
is returned, dispatcher should end flow immediately. Otherwise a positive integer has to be returned that represents number of seconds for retry.
Danger
As the sampling strategy function is executed by dispatcher it can not raise any exception! If an exception is raised, the behaviour is undefined.
Dedicating a separate worker for dispatcher - Cluster segmenation¶
Usually, it is a good idea to introduce specific workers for dispatcher task. Usually it resolves in better performance as workers that do actual computational tasks do not necessarily need to check flow status (especially with prefetching taken into account). This means that you can introduce 1-2 workers (based on number of messages that are fed into queues, you can also apply auto-scaling) that listen only on queues dedicated to dispatcher so these workers actually schedule the workload to specific workers based on queue configuration. To find a right balance in number of workers for specific queues can take some time, but the performance impact usually counts.
Storage optimization & Distributed caches¶
By using Selinon you can reach to two main issues with your cluster on heavy load:
- Your cluster is not powerful enough to serve requested number of tasks.
- Your storage/database cannot process requested numbers of requests or your network is not capable to transmit such number of queries.
In the first case the solution is simple: buy/use more hardware.
In the latter one there are two main approaches how to tackle such bottleneck. You can always use more storage replicas or split data accross multiple storages and transparently configure Selinon to use different storages for different purposes (see storages aliasing in Best practices).
If the above solution is not suitable for you or you want to optimize even more, Selinon offers you an optimization that introduces distributed caches. These caches are distributed across nodes (workers) in your cluster and act like a caching mechanism to reduce number of requests to storages/databases and keep data more close to execution nodes.
Selinon by default uses cache of size 0 (no items are added to the cache). There are prepared in-memory caches like FIFO (First-In-First-Out cache), LIFO (Last-In-First-Out cache), LRU (Least-Recently-Used cache), MRU (Most-Recently-Used cache), RR (Random-Replacement cache). See selinon.caches
for more info.
Note
You can simply use for example Redis for caching. Just deploy Redis in the same pod as your worker and point caching mechanism to Redis adapter in your YAML configuration adapter. This way you will reduce number of requests to database as results get cached in Redis (available in the same pod) once available.
Caching task results¶
Results of your tasks can get cached. This is especially useful when you use predicates that query storage/database often. To define a cache just provide configuration in your YAML configuration as shown bellow:
tasks:
- name: 'Task1'
import: 'myapp.tasks'
cache:
# from myapp.cache import RedisCache
name: 'RedisCache'
import: 'myapp.cache'
configuration:
host: 'redis'
port: 6379
db: 0
password: 'secretpassword'
charset: 'utf-8'
host: 'mongo'
port: 27017
Results are added to the cache only if dispatcher requests results from cache for predicates. Any error with user’s cache is reported using the tracing mechanism (see the Trace module
). In case of cache failures, Selinon will continue using results directly from storage as cache failures are not fatal.
Note
Caching task results could be beneficial if you have a lot of conditions that depend on some task results. They could be even more beneficial if you do flow or task throttling with conditions (see Best practices for more info).
Caching task states¶
You can also introduce caching mechanism for task states. Note that task states are handled by Celery (refer to Celery’s AsyncResult
for more details). Selinon offers you a way on how to place a cache as an intermediate:
flow-definitions:
- name: 'flow1'
cache:
# from myapp.cache import RedisCache
name: 'RedisCache'
import: 'myapp.cache'
configuration:
host: 'redis-cache'
port: 6379
db: 0
password: 'secretpassword'
charset: 'utf-8'
host: 'mongohost'
port: 27017
edges:
- from:
to: 'Task1'
As you can see, caches are per-flow specific and configurable. This way you can easily use caches only for flows that you consider critical for caching mechanism.
The RedisCache
implementation has to derive from Cache
as well and implement required methods. Note that the configuration is passed to cache constructor similarly as in DataStorage
case - as keyword arguments (see Storage adapter implementation).
Note
Caching task states is generally a good idea if you depend on many task states in your flow edges (a lot of source tasks in edges) and these tasks have various execution time (very “width” flows).
Note
Due to results consistency information about task states are added to caches only if task (or flow) fails or finishes - there won’t be any flow or task with the same id executed in the future.
As in case of task result caches, if there is some issue with a cache, these errors are reported but they do not have fatal effect on the flow. If there is something wrong, Selinon will just use directly result backend.
Prioritization of tasks and flows¶
You can easily prioritize tasks or flows by placing them on separate queues and dedicate more workers to process these tasks or flows (see queue
option in the YAML configuration section). Note that queues that are reserved for flows are used for dispatcher task that does scheduling based on state (not for tasks in general that are stated in the flow).
Note
Think about prioritization before you do production deployment - place each flow and task on a separate queue and you can easily prioritize flows and tasks without purging queues or performing redeployment.
To run Celery worker only the given queues queue1
and queue2
specify Celery’s -Q
argument:
celery worker -A myapp.start -Q queue1,queue2 # additional parameters follow
Throttling of tasks and flows¶
Throttling can be performed by reducing number of workers that process given task or flow as opposite to described prioritization (see above).
If you would like to perform even more robust throttling, you can apply throttling option on task or flow level (see throttling
option in the YAML configuration section). Note that for setting up throttling properly you need to dedicate one cluster node that would accept dispatcher task (flow on a separate queue) in the given flow with throttled tasks or flows. This guarantees that scheduling of throttled tasks or flows is done on one place so the given worker node can perform time-based throttling properly based on internally kept state.
Flow throttling is done on flow level (not on task level) similarly as flow prioritization described above.
Note
Place each task and flow (dispatcher scheduling) on a separate queue so you can easily do throttling without purging queues when changing prioritization or throttling related configuration.
An example of configuration:
---
tasks:
- name: Task1
import: myapp.tasks
queue: task1_v0
throttling: # task level throttling
seconds: 10
# additional entries follow
flow-definitions:
- name: flow1
queue: flow1_v0
throttling: # flow level throttling
minutes: 30
seconds: 10
edges:
# additional entries follow
Other optimizations¶
If you would like to optimize performance take a look at Celery’s user guide for optimizing Celery execution.
For example configuring prefetch multiplier (number of messages that are prefetched in a bulk mode from broker to worker) can result in significant speed up (more than 10 times based on my experience). In order to find the right and balanced solution for your cluster, you will need to experiment a bit.