Source code for selinon.selinon_task

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Base class for user-defined tasks."""

import abc
import logging

from .celery import AsyncResult
from .errors import NoParentNodeError
from .errors import RequestError
from .errors import Retry
from .storage_pool import StoragePool


[docs]class SelinonTask(metaclass=abc.ABCMeta): """Base class for user-defined tasks.""" def __init__(self, flow_name, task_name, parent, task_id, dispatcher_id): """Initialize SelinonTask (called by SelinonTaskEnvelope). :param flow_name: name of flow under which this tasks runs on :param task_name: name of task, note it can be aliased since we can have different task name and class name :param parent: direct task's predecessors stated in flow dependency :param task_id: id of this task :parent dispatcher_id: id of dispatcher handling the current flow """ # pylint: disable=too-many-arguments self.flow_name = flow_name self.task_name = task_name self.parent = parent self.task_id = task_id self.dispatcher_id = dispatcher_id self.log = logging.getLogger(__name__) def _selinon_dereference_task_id(self, flow_names, task_name, index): """Compute task id based on mapping of ancestors (from parent sub-flows). :param flow_names: name of parent flow or list of flow names in case of nested flows :param task_name: name of task in parent flow :param index: index of result if more than one subflow was run :return: task id based from parent subflows """ if not isinstance(flow_names, list): flow_names = [flow_names] parent_flow = self.parent for flow_name in flow_names: try: parent_flow = parent_flow[flow_name] except KeyError as exc: raise NoParentNodeError("No such parent flow '%s' for task '%s', check your configuration; nested " "as %s from flow %s" % (flow_name, self.task_name, flow_names, self.flow_name)) from exc try: task_id = parent_flow[task_name][index] except KeyError as exc: raise NoParentNodeError("No such parent task '%s' referenced by '%s' was run for task '%s' in flow '%s'" % (task_name, flow_names, self.task_name, self.flow_name)) from exc except IndexError as exc: raise NoParentNodeError("Requested index %s in parent task '%s' referencered by '%s', but there were " "run only %d tasks in task %s flow %s" % (index, task_name, flow_names, len(parent_flow[task_name]), self.task_name, self.flow_name)) from exc return task_id @property def storage(self): """Storage instance assigned to this task. :return: tasks's configured storage as stated in YAML config """ return StoragePool.get_storage_by_task_name(self.task_name)
[docs] def parent_task_result(self, parent_name): """Retrieve parent task result. :param parent_name: name of parent task to retrieve result from :return: result of parent task """ try: parent_task_id = self.parent[parent_name] except KeyError as exc: raise NoParentNodeError("No such parent '%s' in task '%s' in flow '%s', check your configuration" % (parent_name, self.task_name, self.flow_name)) from exc return StoragePool.retrieve(self.flow_name, parent_name, parent_task_id)
[docs] def parent_flow_result(self, flow_names, task_name, index=None): """Retrieve result of parent sub-flow task. Get parent subflow results; note that parent flows can return multiple results from task of same type because of loops in flows :param flow_names: name of parent flow or list of flow names in case of nested flows :param task_name: name of task in parent flow :param index: index of result if more than one subflow was run :return: result of task in parent subflow """ index = -1 if index is None else index parent_flow_name = flow_names if not isinstance(flow_names, list) else flow_names[-1] task_id = self._selinon_dereference_task_id(flow_names, task_name, index) return StoragePool.retrieve(parent_flow_name, task_name, task_id)
[docs] def parent_task_exception(self, parent_name): """Retrieve parent task exception. You have to call this from a fallback (direct or transitive). :param parent_name: name of task that failed (ancestor of calling task) :return: exception that was raised in the ancestor """ try: parent_task_id = self.parent[parent_name] except KeyError as exc: raise NoParentNodeError("No such parent '%s' in task '%s' in flow '%s', check your configuration" % (parent_name, self.task_name, self.flow_name)) from exc # Celery stores exceptions in result field celery_result = AsyncResult(parent_task_id) if not celery_result.failed(): raise RequestError("Parent task '%s' did not raised exception" % parent_name) return celery_result.result
[docs] def parent_flow_exception(self, flow_names, task_name, index=None): """Retrieve parent task exception. You have to call this from a fallback (direct or transitive). :param flow_names: name of parent flow or list of flow names in case of nested flows :param task_name: name of task that failed (ancestor of calling task) :param index: index of result if more than one subflow was run :return: exception that was raised in the ancestor """ index = -1 if index is None else index task_id = self._selinon_dereference_task_id(flow_names, task_name, index) # Celery stores exceptions in result field celery_result = AsyncResult(task_id) if not celery_result.failed(): raise RequestError("Parent task '%s' from subflows %s did not raised exception" % (task_name, str(flow_names))) return celery_result.result
[docs] @staticmethod def retry(countdown=0): """Retry, always raises Retry, this is compatible with Celery's retry except you cannot modify arguments. :param countdown: countdown for rescheduling """ raise Retry(countdown)
[docs] @abc.abstractmethod def run(self, node_args): """Entrypoint - user defined computation. :param node_args: arguments passed to flow/node :return: tasks's result that will be stored in database as configured """