Source code for selinon.task_envelope

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""A raw Celery task that is responsible for running SelinonTask."""

import json
import sys
import traceback

import jsonschema

from .celery import Task
from .config import Config
from .errors import FatalTaskError
from .errors import Retry
from .storage_pool import StoragePool
from .trace import Trace  # Ignore PyImportSortBear


[docs]class SelinonTaskEnvelope(Task): """A Celery task that is responsible for running user defined tasks from flow.""" # Celery configuration ignore_result = False acks_late = True track_started = True max_retries = None name = "selinon.SelinonTaskEnvelope"
[docs] @classmethod def validate_result(cls, task_name, result): """Validate result of the task for the given schema, if fails an Exception is raised. :param task_name: name of task :param result: result of task """ schema_path = Config.output_schemas.get(task_name) if schema_path: with open(schema_path, "r") as input_file: schema = json.load(input_file) jsonschema.validate(result, schema)
[docs] def selinon_retry(self, task_name, flow_name, parent, node_args, retry_countdown, retried_count, dispatcher_id, user_retry=False): # pylint: disable=too-many-arguments """Retry on Celery level. :param task_name: name of the task to be retried :param flow_name: name of in which the task was run :param parent: dict of parent tasks :param node_args: arguments for task :param retry_countdown: countdown for retry :param retried_count: number of retries already done with this task :param dispatcher_id: ID id of dispatcher that is handling flow that run this task :param user_retry: True if retry was forced from the user """ max_retry = Config.max_retry.get(task_name, 0) kwargs = { 'task_name': task_name, 'flow_name': flow_name, 'parent': parent, 'node_args': node_args, 'dispatcher_id': dispatcher_id, 'retried_count': retried_count } Trace.log(Trace.TASK_RETRY, {'flow_name': flow_name, 'task_name': task_name, 'task_id': self.request.id, 'parent': parent, 'node_args': node_args, 'what': traceback.format_exc(), 'retry_countdown': retry_countdown, 'retried_count': retried_count, 'user_retry': user_retry, 'queue': Config.task_queues[task_name], 'dispatcher_id': dispatcher_id, 'max_retry': max_retry}) raise self.retry(kwargs=kwargs, countdown=retry_countdown, queue=Config.task_queues[task_name])
[docs] def run(self, task_name, flow_name, parent, node_args, dispatcher_id, retried_count=None): # pylint: disable=arguments-differ,too-many-arguments,too-many-locals """Task entry-point called by Celery. :param task_name: task to be run :param flow_name: flow in which this task run :param parent: dict of parent nodes :param node_args: node arguments within the flow :param dispatcher_id: dispatcher id that handles flow :param retried_count: number of already attempts that failed so task was retried :rtype: None """ # we are passing args as one argument explicitly for now not to have troubles with *args and **kwargs mapping # since we depend on previous task and the result can be anything Trace.log(Trace.TASK_START, {'flow_name': flow_name, 'task_name': task_name, 'task_id': self.request.id, 'parent': parent, 'queue': Config.task_queues[task_name], 'dispatcher_id': dispatcher_id, 'node_args': node_args}) try: task = Config.get_task_instance( task_name=task_name, flow_name=flow_name, parent=parent, task_id=self.request.id, dispatcher_id=dispatcher_id ) result = task.run(node_args) self.validate_result(task_name, result) storage = StoragePool.get_storage_name_by_task_name(task_name, graceful=True) if storage and not Config.storage_readonly[task_name]: StoragePool.set(node_args, flow_name, task_name, self.request.id, result) elif result is not None: Trace.log(Trace.TASK_DISCARD_RESULT, {'flow_name': flow_name, 'task_name': task_name, 'task_id': self.request.id, 'parent': parent, 'node_args': node_args, 'queue': Config.task_queues[task_name], 'dispatcher_id': dispatcher_id, 'result': result}) except Retry as retry: # we do not touch retried_count self.selinon_retry(task_name, flow_name, parent, node_args, retry.countdown, retried_count, dispatcher_id, user_retry=True) except Exception as exc: # pylint: disable=broad-except exc_info = sys.exc_info() max_retry = Config.max_retry.get(task_name, 0) retried_count = retried_count or 0 if max_retry > retried_count and not isinstance(exc, FatalTaskError): retried_count += 1 retry_countdown = Config.retry_countdown.get(task_name, 0) self.selinon_retry(task_name, flow_name, parent, node_args, retry_countdown, retried_count, dispatcher_id) else: Trace.log(Trace.TASK_FAILURE, {'flow_name': flow_name, 'task_name': task_name, 'task_id': self.request.id, 'parent': parent, 'node_args': node_args, 'what': traceback.format_exc(), 'queue': Config.task_queues[task_name], 'dispatcher_id': dispatcher_id, 'retried_count': retried_count}) storage = StoragePool.get_storage_name_by_task_name(task_name, graceful=True) if storage and not Config.storage_readonly[task_name] \ and not StoragePool.set_error(node_args, flow_name, task_name, self.request.id, exc_info): # TODO: move conversion to string to enhanced JSON handler and rather pass objects in Trace.log() Trace.log(Trace.STORAGE_OMIT_STORE_ERROR, { 'flow_name': flow_name, 'node_args': node_args, 'task_name': task_name, 'task_id': self.request.id, 'error_type': str(exc_info[0]), 'error_value': str(exc_info[1]), 'error_traceback': "".join(traceback.format_tb(exc_info[2])), }) raise self.retry(max_retries=0, exc=exc) Trace.log(Trace.TASK_END, {'flow_name': flow_name, 'task_name': task_name, 'task_id': self.request.id, 'parent': parent, 'node_args': node_args, 'queue': Config.task_queues[task_name], 'dispatcher_id': dispatcher_id, 'storage': StoragePool.get_storage_name_by_task_name(task_name, graceful=True)})