Source code for selinon.executor.celery_mocks

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Injected Celery related implementations of methods.

Classes and functions to make Selinon executor work as a standalone CLI.
"""


[docs]class SimulateRequest: """Simulate Celery's Task.request. Make possible to query task id right inside task by calling self.request.id """ def __init__(self, instance): """Instantiate request. :param instance:Instance for the request. """ self.id = str(id(instance)) # pylint: disable=redefined-builtin,invalid-name
[docs]class SimulateAsyncResult: """Simulate AsyncResult returned by apply_async() or by instantiating AsyncResult by Task id.""" task_failures = {} task_successes = {} def __init__(self, node_name, node_id): # pylint: disable=redefined-builtin,invalid-name """Initialize AsyncResult. :param node_name: name of the node for which the AsyncResult should be stored :param node_id: ID of node for which the AsyncResult should be stored """ self.task_id = str(node_id) self.node_id = node_name
[docs] @classmethod def set_successful(cls, task_id, result): """Mark task as successful after run - called by executor. :param task_id: an ID of task that should be marked as successful :param result: result of task (None for SelinonTaskEnvelope, JSON describing system state for Dispatcher) """ cls.task_successes[task_id] = result
[docs] @classmethod def set_failed(cls, task_id, exception): """Mark task as failed after run - called by executor. :param task_id: an ID of task that should be marked as failed :param exception: exception raised in task """ cls.task_failures[task_id] = exception
[docs] def successful(self): """Check for success. :return: True if task succeeded. """ return self.task_id in self.task_successes
[docs] def failed(self): """Check for failure. :return: True if task failed """ return self.task_id in self.task_failures
@property def traceback(self): """Traceback as returned by Celery's AsyncResult. :return: traceback returned by a task """ return self.task_failures[self.task_id] @property def result(self): """Get result of a task. :return: retrieve result of the task or exception that was raised """ return self.task_successes.get(self.task_id, None)
[docs]class SimulateRetry(Exception): """Simulate Celery Retry exception raised by self.retry().""" def __init__(self, instance, **celery_kwargs): """Instantiate Retry exception. :param instance: instance that triggered retry :param celery_kwargs: kwargs arguments as passed to raw Celery task """ super().__init__() self.instance = instance self.celery_kwargs = celery_kwargs
[docs]def simulate_apply_async(instance, **celery_kwargs): """Simulate CeleryTask().apply_async() implementation for scheduling tasks. :param instance: instance that should be scheduled :param celery_kwargs: kwargs supplied to Celery Task (also carry arguments for Selinon) """ from .executor import Executor instance.request = SimulateRequest(instance) Executor.schedule(instance, celery_kwargs) selinon_kwargs = celery_kwargs['kwargs'] return SimulateAsyncResult(selinon_kwargs.get('task_name', selinon_kwargs['flow_name']), node_id=id(instance))
[docs]def simulate_retry(instance, **celery_kwargs): """Simulate Celery self.retry() implementation for retrying tasks. :param instance: instance that should called self.retry() :param celery_kwargs: kwargs that will be supplied to Celery Task (also carry arguments for Selinon) """ raise SimulateRetry(instance, **celery_kwargs)