Source code for selinon.storage_pool

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""A pool that carries all database connections for workers."""

import traceback

from .config import Config
from .errors import CacheMissError
from .errors import StorageError
from .errors import UnknownStorageError
from .lock_pool import LockPool
from .trace import Trace


[docs]class StoragePool: """A pool that carries all database connections for workers.""" _storage_pool_locks = LockPool() def __init__(self, id_mapping, flow_name): """Initialize storage pool instance based on the current context. :param id_mapping: mapping tasks and their ids :param flow_name: name of flow for which StoragePool context is created """ self._id_mapping = id_mapping or {} self._flow_name = flow_name
[docs] @classmethod def get_storage_name_by_task_name(cls, task_name, graceful=False): """Get name of storage that was assigned to the given task. :param task_name: name of a task :param graceful: return None instead of raising an exception :return: storage name for task """ storage = Config.task2storage_mapping.get(task_name) if storage is None and not graceful: raise UnknownStorageError("No storage for task '%s' defined" % task_name) return storage
[docs] @classmethod def get_storage_by_task_name(cls, task_name): """Get storage instance that was assigned to the given task. :param task_name: task's name for which storage should be get :rtype: DataStorage """ storage_name = cls.get_storage_name_by_task_name(task_name, graceful=True) if storage_name: return cls.get_connected_storage(storage_name) return None
[docs] @classmethod def get_connected_storage(cls, storage_name): """Retrieve connected storage based by its name stated in configuration. :param storage_name: name of storage :return: connected storage """ # if this raises UnknownStorageError exception it means that the flow was not configured properly, should # be handled during Python code generation. storage = Config.storage_mapping[storage_name] if not storage.is_connected(): with cls._storage_pool_locks.get_lock(storage): if not storage.is_connected(): Trace.log(Trace.STORAGE_CONNECT, {'storage_name': storage_name}) storage.connect() return storage
[docs] def get(self, task_name): """Retrieve data for task based on mapping for the current context. :param task_name: task's name that we are retrieving data for :return: task's result for the current context """ return self.retrieve(self._flow_name, task_name, self._id_mapping[task_name])
[docs] @classmethod def retrieve(cls, flow_name, task_name, task_id): """Retrieve task's result from database which was configured to be used for desired task. :param flow_name: flow in which the retrieval is taking place :param task_name: name of task for which result should be retrieved :param task_id: task ID to uniquely identify task results :return: task's result """ storage = cls.get_storage_by_task_name(task_name) storage_task_name = Config.storage_task_name[task_name] storage_name = cls.get_storage_name_by_task_name(task_name) trace_msg = { 'task_name': task_name, 'storage_task_name': storage_task_name, 'storage_name': storage_name, 'flow_name': flow_name, 'task_id': task_id } with cls._storage_pool_locks.get_lock(storage): cache = Config.storage2storage_cache[storage_name] result = None result_retrieved = False # Actually it is OK if there are some issues with task result cache - if there is some issue, just # report it in the tracing mechanism so users are aware of it and try to talk directly to storage # instead. Trace.log(Trace.TASK_RESULT_CACHE_GET, trace_msg) try: result = cache.get(task_id, task_name=storage_task_name, flow_name=flow_name) result_retrieved = True except CacheMissError: Trace.log(Trace.TASK_RESULT_CACHE_MISS, trace_msg, what=traceback.format_exc()) except Exception: # pylint: disable=broad-except Trace.log(Trace.TASK_RESULT_CACHE_ISSUE, trace_msg, what=traceback.format_exc()) else: Trace.log(Trace.TASK_RESULT_CACHE_HIT, trace_msg) if not result_retrieved: Trace.log(Trace.STORAGE_RETRIEVE, trace_msg) try: result = storage.retrieve(flow_name, task_name, task_id) except Exception as exc: error_msg = "Failed to retrieve result from storage after the result was not found in cache" Trace.log(Trace.STORAGE_ISSUE, trace_msg, what=traceback.format_exc()) raise StorageError(error_msg) from exc Trace.log(Trace.TASK_RESULT_CACHE_ADD, trace_msg) try: cache.add(task_id, result) except Exception: # pylint: disable=broad-except Trace.log(Trace.TASK_RESULT_CACHE_ISSUE, trace_msg, what=traceback.format_exc()) return result
[docs] @classmethod def delete(cls, flow_name, task_name, task_id): """Delete task's result from database which was configured to be used for desired task. :param flow_name: flow in which the retrieval is taking place :param task_name: name of task for which result should be deleted :param task_id: task ID to uniquely identify task results """ storage = cls.get_storage_by_task_name(task_name) storage_task_name = Config.storage_task_name[task_name] storage_name = cls.get_storage_name_by_task_name(task_name) trace_msg = { 'task_name': task_name, 'storage_task_name': storage_task_name, 'storage_name': storage_name, 'flow_name': flow_name, 'task_id': task_id } with cls._storage_pool_locks.get_lock(storage): Trace.log(Trace.STORAGE_DELETE, trace_msg) try: storage.delete(flow_name, task_name, task_id) except Exception as exc: error_msg = "Failed to delete result from storage" Trace.log(Trace.STORAGE_ISSUE, trace_msg, what=traceback.format_exc()) raise StorageError(error_msg) from exc Trace.log(Trace.STORAGE_DELETED, trace_msg) return
[docs] @classmethod def set(cls, node_args, flow_name, task_name, task_id, result): # pylint: disable=too-many-arguments """Store result for task. :param node_args: arguments that were passed to the node :param flow_name: flow in which task was run :param task_name: task that computed result :param task_id: task id that computed result :param result: result that should be stored :return: result ID - a unique ID which can be used to reference task results """ storage = cls.get_storage_by_task_name(task_name) storage_task_name = Config.storage_task_name[task_name] record_id = storage.store(node_args, flow_name, storage_task_name, task_id, result) Trace.log(Trace.STORAGE_STORE, { 'flow_name': flow_name, 'node_args': node_args, 'task_name': task_name, 'storage_task_name': storage_task_name, 'task_id': task_id, 'storage_name': Config.task2storage_mapping[task_name], 'record_id': record_id }) return record_id
[docs] @classmethod def set_error(cls, node_args, flow_name, task_name, task_id, exc_info): # pylint: disable=too-many-arguments """Store error information for task failure. :param node_args: arguments that were passed to the node :param flow_name: flow in which task was run :param task_name: task that computed result :param task_id: task id that computed result :param exc_info: information about exception - tuple (type, value, traceback) as returned by sys.exc_info() :return: true if error was stored in database - DataStorage.store_error() was called """ storage = cls.get_storage_by_task_name(task_name) storage_task_name = Config.storage_task_name[task_name] try: record_id = storage.store_error(node_args, flow_name, storage_task_name, task_id, exc_info) except NotImplementedError: return False # TODO: move conversion to string to enhanced JSON handler and rather pass objects in Trace.log() Trace.log(Trace.STORAGE_STORE_ERROR, { 'flow_name': flow_name, 'node_args': node_args, 'task_name': task_name, 'storage_task_name': storage_task_name, 'task_id': task_id, 'storage_name': Config.task2storage_mapping[task_name], 'error_type': str(exc_info[0]), 'error_value': str(exc_info[1]), 'error_traceback': "".join(traceback.format_tb(exc_info[2])), 'record_id': record_id }) return True