Source code for selinon.data_storage
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Data storage interface."""
import abc
[docs]class SelinonMissingDataException(Exception):
"""Selinon exception that ressource was not found by storage"""
pass
[docs]class DataStorage(metaclass=abc.ABCMeta):
"""Abstract Selinon storage adapter that is implemented by a user."""
@abc.abstractmethod
def __init__(self, *args, **kwargs):
"""Initialize storage.
:param args: storage arguments as stated in YAML configuration file
:param kwargs: storage key-value arguments as stated in YAML configuration (preferred over args)
"""
[docs] @abc.abstractmethod
def connect(self):
"""Connect to a resource, if not needed, should be empty."""
raise NotImplementedError()
[docs] @abc.abstractmethod
def is_connected(self):
"""Check storage connection status.
:return: True if connected to a resource
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def disconnect(self):
"""Disconnect from a resource."""
raise NotImplementedError()
[docs] @abc.abstractmethod
def retrieve(self, flow_name, task_name, task_id):
"""Retrieve result stored in storage.
:param flow_name: flow name in which task was executed
:param task_name: task name that result is going to be retrieved
:param task_id: id of the task that result is going to be retrieved
:return: task result
"""
raise NotImplementedError()
[docs] @abc.abstractmethod
def store(self, node_args, flow_name, task_name, task_id, result): # pylint: disable=too-many-arguments
"""Store result stored in storage.
:param node_args: arguments that were passed to node
:param flow_name: flow name in which task was executed
:param task_name: task name that result is going to be stored
:param task_id: id of the task that result is going to be stored
:param result: result that should be stored
:return: unique ID of stored record
"""
raise NotImplementedError()
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # pylint: disable=too-many-arguments
"""Store information about task error.
:param node_args: arguments that were passed to node
:param flow_name: flow name in which task was executed
:param task_name: task name that result is going to be stored
:param task_id: id of the task that result is going to be stored
:param exc_info: information about exception - tuple (type, value, traceback) as returned by sys.exc_info()
:return: unique ID of stored record
"""
# pylint: disable=abstract-method
# no not mark this method with @abc.abstractmethod as we do not force a user to implement this
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id):
"""Delete result stored in storage.
:param flow_name: flow name in which task was executed
:param task_name: task name that result is going to be retrieved
:param task_id: id of the task that result is going to be retrieved
"""
raise NotImplementedError("delete method is not implemented")
def __del__(self):
"""Clean up."""
if self.is_connected():
self.disconnect()