Source code for selinon.run

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Selinon flow scheduling (entrypoint) for a user."""

import logging

from .config import Config
from .dispatcher import Dispatcher
from .errors import UnknownFlowError
from .selective import compute_selective_run
from .trace import Trace

_logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


def _do_run_flow(kwargs, queue):
    """Actually run the flow, return dispatcher id."""
    dispatcher_id = Dispatcher().apply_async(kwargs=kwargs, queue=queue)
    trace_msg = {
        'flow_name': kwargs["flow_name"],
        'node_args': kwargs["node_args"],
        'dispatcher_id': str(dispatcher_id),
        'queue': queue,
        'selective': kwargs.get("selective"),
    }
    Trace.log(Trace.FLOW_SCHEDULE, trace_msg)
    return dispatcher_id


[docs]def run_flow(flow_name, node_args=None): """Run flow by it's name. :param flow_name: name of the flow to be run :param node_args: arguments that will be supplied to flow :return: flow ID (dispatcher ID) """ if Config.dispatcher_queues is None or flow_name not in Config.dispatcher_queues: raise UnknownFlowError("No flow with name '%s' defined" % flow_name) queue = Config.dispatcher_queues[flow_name] _logger.debug("Scheduling flow '%s' with node_args '%s' on queue '%s'", flow_name, node_args, queue) kwargs = {'flow_name': flow_name, 'node_args': node_args} return _do_run_flow(kwargs, queue)
[docs]def run_flow_selective(flow_name, task_names, node_args, follow_subflows=False, run_subsequent=False): """Run only desired tasks in a flow. :param flow_name: name of the flow that should be run :param task_names: name of the tasks that should be run :param node_args: arguments that should be supplied to flow :param follow_subflows: if True, subflows will be followed and checked for nodes to be run :param run_subsequent: trigger run of all tasks that depend on the desired task :return: dispatcher id that is scheduled to run desired selective task flow :raises selinon.errors.SelectiveNoPathError: there was no way found to the desired task in the flow """ selective = compute_selective_run(flow_name, task_names, follow_subflows, run_subsequent) queue = Config.dispatcher_queues[flow_name] _logger.debug("Scheduling selective flow '%s' with node_args '%s' on queue '%s', computed selective run state: %s", flow_name, node_args, queue, selective) kwargs = {'flow_name': flow_name, 'node_args': node_args, 'selective': selective} return _do_run_flow(kwargs, queue)