Source code for selinon.task

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""A task representation from YAML config file."""

import logging

from .errors import ConfigurationError
from .node import Node
from .selective_run_function import SelectiveRunFunction


[docs]class Task(Node): # pylint: disable=too-many-instance-attributes,arguments-differ """A task representation within the system.""" _DEFAULT_MAX_RETRY = 0 _DEFAULT_RETRY_COUNTDOWN = 0 _logger = logging.getLogger(__name__) def __init__(self, name, import_path, storage, **opts): """Initialize a task node in a flow graph. :param name: name of the task :param import_path: tasks's import :param storage: storage that should be used :param opts: additional options for task """ super().__init__(name) self.class_name = opts.pop('classname', name) self.storage = storage self.import_path = import_path if 'storage_task_name' in opts and not self.storage: raise ConfigurationError("Unable to assign storage_task_name for task '%s' (class '%s' from '%s'), task " "has no storage assigned" % (self.name, self.class_name, self.import_path)) if 'selective_run_function' in opts: self.selective_run_function = SelectiveRunFunction.from_dict(opts.pop('selective_run_function')) else: self.selective_run_function = SelectiveRunFunction.get_default() self.storage_task_name = opts.pop('storage_task_name', name) self.output_schema = opts.pop('output_schema', None) if opts.get('retry_countdown') is not None and opts.get('max_retry', 0) == 0: self._logger.warning("Retry countdown set for task '%s' (class '%s' from '%s') but this task has" "retry set to 0", self.name, self.class_name, self.import_path) self.max_retry = opts.pop('max_retry', self._DEFAULT_MAX_RETRY) self.retry_countdown = opts.pop('retry_countdown', self._DEFAULT_RETRY_COUNTDOWN) self.queue_name = self._expand_queue_name(opts.pop('queue', None)) self.storage_readonly = opts.pop('storage_readonly', False) self.throttling = self.parse_throttling(opts.pop('throttling', {})) if opts: raise ConfigurationError("Unknown task option provided for task '%s' (class '%s' from '%s'): %s" % (name, self.class_name, self.import_path, opts)) # register task usage if self.storage: storage.register_task(self) self._logger.debug("Creating task with name '%s' import path '%s', class name '%s'", self.name, self.import_path, self.class_name)
[docs] def check(self): """Check task definitions for errors. :raises: ValueError if an error occurred """ if not isinstance(self.import_path, str): raise ConfigurationError("Error in task '%s' definition - import path should be string; got '%s'" % (self.name, self.import_path)) if self.class_name is not None and not isinstance(self.class_name, str): raise ConfigurationError("Error in task '%s' definition - class instance should be string; got '%s'" % (self.name, self.class_name)) if self.output_schema is not None and not isinstance(self.output_schema, str): raise ConfigurationError("Error in task '%s' definition - output schema should be string; got '%s'" % (self.name, self.output_schema)) if self.max_retry is not None and (not isinstance(self.max_retry, int) or self.max_retry < 0): raise ConfigurationError("Error in task '%s' definition - max_retry should be None, zero or positive " "integer; got '%s'" % (self.name, self.max_retry)) if self.retry_countdown is not None and (not isinstance(self.retry_countdown, int) or self.retry_countdown < 0): raise ConfigurationError("Error in task '%s' definition - retry_countdown should be None or positive " "integer; got '%s'" % (self.name, self.retry_countdown)) if self.queue_name is not None and not isinstance(self.queue_name, str): raise ConfigurationError("Invalid task queue, should be string, got %s" % self.queue_name) if not isinstance(self.storage_readonly, bool): raise ConfigurationError("Storage usage flag readonly should be of type bool")
[docs] @staticmethod def from_dict(dictionary, system): """Construct task from a dict and check task's definition correctness. :param dictionary: dictionary to be used to construct the task :type dictionary: dict :param system: system that should be used to for lookup a storage :type system: System :return: Task instance :rtype: selinon.selinon_task.Task :raises: ValueError """ if 'name' not in dictionary or not dictionary['name']: raise ConfigurationError('Task name definition is mandatory') if 'import' not in dictionary or not dictionary['import']: raise ConfigurationError('Task import definition is mandatory') if 'storage' in dictionary: storage = system.storage_by_name(dictionary.pop('storage')) else: storage = None instance = Task(dictionary.pop('name'), dictionary.pop('import'), storage, **dictionary) instance.check() return instance