Source code for selinon.storages.redis
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Selinon adapter for Redis database."""
import os
import json
from selinon.data_storage import SelinonMissingDataException
from selinon import DataStorage
try:
import redis
except ImportError as exc:
raise ImportError("Please install dependencies using `pip3 install selinon[redis]` "
"in order to use RedisStorage") from exc
[docs]class Redis(DataStorage): # pylint: disable=too-many-instance-attributes
"""Selinon adapter for Redis database."""
def __init__(self, host=None, port=6379, db=0, password=None, socket_timeout=None, connection_pool=None,
charset=None, errors=None, unix_socket_path=None):
"""Instantiate Redis database adapter.
:param host: Redis host
:param port: Redis port
:param db: Redis database to be used
:param password: password to be used
:param socket_timeout: socket timeout
:param connection_pool: redis connection pool
:param charset: connection character set
:param errors: error treating method
:param unix_socket_path: path to unix socket, if any
"""
super().__init__()
self.conn = None
self.host = host.format(**os.environ) if host else 'localhost'
self.port = int(port.format(**os.environ)) if isinstance(port, str) else port
self.db = int(db.format(**os.environ) if isinstance(db, str) else db) # pylint: disable=invalid-name
self.password = password.format(**os.environ) if password else None
self.socket_timeout = socket_timeout
self.connection_pool = connection_pool
self.charset = charset or 'utf-8'
self.errors = errors or 'strict'
self.unix_socket_path = unix_socket_path
[docs] def is_connected(self): # noqa
return self.conn is not None
[docs] def connect(self): # noqa
self.conn = redis.Redis(host=self.host, port=self.port, db=self.db, password=self.password,
socket_timeout=self.socket_timeout, connection_pool=self.connection_pool,
charset=self.charset, errors=self.errors)
[docs] def disconnect(self): # noqa
if self.is_connected():
self.conn.connection_pool.disconnect()
self.conn = None
[docs] def retrieve(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
ret = self.conn.get(task_id)
if ret is None:
raise FileNotFoundError("Record not found in database")
record = json.loads(ret.decode(self.charset))
assert record.get('task_name') == task_name # nosec
return record.get('result')
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
assert self.is_connected() # nosec
record = {
'node_args': node_args,
'flow_name': flow_name,
'task_name': task_name,
'task_id': task_id,
'result': result
}
self.conn.set(task_id, json.dumps(record))
return task_id
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
ret = self.conn.delete(task_id)
if ret == 0:
raise SelinonMissingDataException("Record not found in database")