Storage adapter implementation¶
Currently, there are available prepared database adapters, see Selinonlib module. In order to use these storages, you have to manually install database adapters using extras as they are not explicitly included by requirements.
SqlStorage - SQLAlchemy adapter for SQL databases¶
pip3 install selinon[postgresql]
A configuration example:
storages:
- name: 'MySqlStorage'
classname: 'PostgreSQL'
import: 'selinon.storages.postgresql'
configuration:
connection_string: 'postgres://postgres:postgres@postgres:5432/postgres'
encoding: 'utf-8'
echo: false
The connection string can be parametrized using environment variables. The implementation is available in selinon.storages.postgresql
.
Redis - Redis database adapter¶
pip3 install selinon[redis]
A configuration example:
storages:
- name: 'MyRedisStorage'
classname: 'Redis'
import: 'selinon.storages.redis'
configuration:
host: 'redishost'
port: 6379
db: 0
charset: 'utf-8'
port: 27017
Configuration entries host, port, password and db can be parametrized using environment variables. The implementation is available in selinon.storages.redis
.
MongoDB - MongoDB database adapter¶
pip3 install selinon[mongodb]
A configuration example:
storages:
- name: 'MyMongoStorage'
classname: 'MongoDB'
import: 'selinon.storages.mongodb'
configuration:
db_name: 'database_name'
collection_name: 'collection_name'
host: 'mongohost'
port: 27017
Configuration entries db_name, collection_name, host and port can be parametrized using environment variables. The implementation is available in selinon.storages.mongodb
.
S3 - AWS S3 database adapter¶
`pip3 install selinon[s3]`
A configuration example:
storages:
- name: 'MyS3Storage'
classname: 'S3Storage'
import: 'selinon.storages.s3'
configuration:
bucket: 'my-bucket-name'
aws_access_key_id: 'AAAAAAAAAAAAAAAAAAAA'
aws_secret_access_key: 'BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB'
region_name: 'us-east-1'
Configuration entries bucket, aws_access_key_id, aws_secret_access_key, region_name, location, use_ssl and endpoint_url can be parametrized using environment variables. The implementation is available in selinon.storages.s3
.
Note
You can use awesome projects such as Ceph Nano, Ceph or Minio to run your application without AWS. You need to adjust endpoint_url configuration entry of this adapter to point to your alternative. You can check Selinon’s demo deployment for more info.
In memory storage¶
A configuration example:
storages:
- name: 'Memory'
classname: 'InMemoryStorage'
import: 'selinon.storages.memory'
configuration:
echo: false
No additional requirements are necessary to be installed. This storage adapter stores results in memory. It is suitable for use with Selinon CLI and executor where you just want to run a flow and check results. As results are stored in memory, it is not possible to scale number of workers in many cases as results are stored in memory of a node.
The implementation is available in selinon.storages.memory
.
Few notes on using adapters¶
If you want to you multiple adapters, you can specify multiple adapters in extras when installing:
pip3 install selinon[mongodb,postgresql,s3]
Note that spaces are not allowed in extras (also escape brackets when using zsh).
Using a custom storage adapter¶
You can define your own storage by inheriting from DataStorage
abstract class:
from selinon import DataStorage
class MyStorage(DataStorage):
def __init__(self, host, port):
# arguments from YAML file are pasased to constructor as key-value arguments
pass
def is_connected():
# predicate used to check connection
return False
def connect():
# define how to connect based on your configuration
pass
def disconnect():
# define how to disconnect from storage
pass
def retrieve(self, flow_name, task_name, task_id):
# define how to retrieve results
pass
def store(self, flow_name, task_name, task_id, result):
# define how to store results
pass
def store_error(self, node_args, flow_name, task_name, task_id, exc_info):
# optionally define how to track errors/task failures if you need to
pass
def delete(self, flow_name, task_name, task_id):
# define how to delete results
pass
And pass this storage to Selinon in your YAML configuration:
storages:
# from myapp.storages import MyStorage
- name: 'MyStorage'
import: 'myapp.storages'
configuration:
host: 'localhost'
port: '5432'
If you create an adapter for some well known storage and you feel that your adapter is generic enough, feel free to share it with community by opening a pull request!
Database connection pool¶
Each worker is trying to be efficient when it comes to number of connections to a database. There is held only one instance of DataStorage
class per whole worker. Selinon transparently takes care of concurrent-safety when calling methods of DataStorage
if you plan to run your worker with concurrency level higher than one.
Note
You can also simply share connection across multiple DataStorage
classes in inheritance hierarchy and reuse already defined connections. You can also do storage aliasing as described in Best practices.
If you would like to request some storage from your configuration, you can request storage adapter from Selinon StoragePool
:
from selinon import StoragePool
# Name of storage was set to MyMongoStorage in nodes.yaml configuration file (section storages).
mongo = StoragePool.get_connected_storage('MyMongoStorage')
Selinon will transparently take care of instantiation, connection and sharing connection pool across the whole process. Check out other useful methods of StoragePool
.
Note
If there is anything wrong with storage or storage adapters causing dispatcher failing to determine the next steps in the flow, dispatcher is retried respecting the flow’s retry_countdown
configuration option. This way you will not lose messages that cannot be consumed due to storage errors. However if a task cannot write or read from a storage, it is marked as failed.