Source code for selinon.failures

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Task and flow failure handling."""

from itertools import chain

from .errors import ConfigurationError
from .failure_node import FailureNode
from .helpers import check_conf_keys


[docs]class Failures: """Node failures and fallback handling.""" def __init__(self, raw_definition, system, flow, last_allocated=None, starting_nodes=None, predicates=None): """Construct failures based on definition stated in YAML config files. :param raw_definition: raw definition of failures :param system: system context :param last_allocated: last allocated starting node for linked list :param starting_nodes: starting nodes for failures :param predicates: all predicates that are used """ self.waiting_nodes = [] self.fallback_nodes = [] for failure in raw_definition: waiting_nodes_entry = [] for node_name in failure['nodes']: node = system.node_by_name(node_name, graceful=True) if not node: raise ConfigurationError("No such node with name '%s' in failure, flow '%s'" % (node_name, flow.name)) waiting_nodes_entry.append(node) if isinstance(failure['fallback'], list): fallback_nodes_entry = [] for node_name in failure['fallback']: node = system.node_by_name(node_name, graceful=True) if not node: raise ConfigurationError("No such node with name '%s' in failure fallback, flow '%s'" % (node_name, flow.name)) fallback_nodes_entry.append(node) elif isinstance(failure['fallback'], bool): fallback_nodes_entry = failure['fallback'] else: raise ConfigurationError("Unknown fallback definition in flow '%s', failure: %s" % (flow.name, failure)) self.waiting_nodes.append(waiting_nodes_entry) self.fallback_nodes.append(fallback_nodes_entry) self.raw_definition = raw_definition self.last_allocated = last_allocated self.starting_nodes = starting_nodes self.predicates = predicates self.flow = flow
[docs] def all_waiting_nodes(self): """Compute all nodes that have defined a fallback. :return: all nodes that for which there is defined a callback """ return list(set(chain(*self.waiting_nodes)))
[docs] def all_fallback_nodes(self): """Compute all fallback nodes. :return: all nodes that are used as fallback nodes """ # remove True/False flags nodes = [] if isinstance(self.fallback_nodes, bool): return nodes for fallback in self.fallback_nodes: if isinstance(fallback, bool): continue for node in fallback: if not isinstance(node, bool): nodes.append(node) return list(set(nodes))
[docs] @staticmethod def construct(system, flow, failures_dict): """Construct Failres. :param system: system context :param flow: a flow to which failures conform :param failures_dict: construct failures from failures dict :rtype: Failures """ for failure in failures_dict: if 'nodes' not in failure or failure['nodes'] is None: raise ConfigurationError("Failure should state nodes for state 'nodes' to fallback from in flow '%s'" % flow.name) if 'fallback' not in failure: raise ConfigurationError("No fallback stated in failure in flow '%s'" % flow.name) if not isinstance(failure['nodes'], list): failure['nodes'] = [failure['nodes']] if not isinstance(failure['fallback'], list) and failure['fallback'] is not True: failure['fallback'] = [failure['fallback']] if failure['fallback'] is not True and len(failure['fallback']) == 1 and len(failure['nodes']) == 1 \ and failure['fallback'][0] == failure['nodes'][0]: raise ConfigurationError("Detect cyclic fallback dependency in flow %s, failure on %s" % (flow.name, failure['nodes'][0])) known_conf_opts = ('nodes', 'fallback', 'propagate_failure', 'condition') unknown_conf = check_conf_keys(failure, known_conf_opts=known_conf_opts) if unknown_conf: raise ConfigurationError("Unknown configuration option supplied in fallback definition: %s" % unknown_conf) last_allocated, starting_nodes, predicates = FailureNode.construct(flow, system, failures_dict) return Failures(failures_dict, system, flow, last_allocated, starting_nodes, predicates)
[docs] @staticmethod def starting_nodes_name(flow_name): """Create a starting node name for graph of all failures nodes for generated Python config. :param flow_name: flow name for which the starting node should be created :return: variable name :rtype: str """ return "_%s_failure_starting_nodes" % flow_name
[docs] @staticmethod def failure_node_name(flow_name, failure_node): """Create a failure node name representation for generated Python config. :param flow_name: name of flow for which the representation should be created :param failure_node: a node from graph of all failure permutations :return: variable name :rtype: str """ return "_%s_fail_%s" % (flow_name, "_".join(failure_node.traversed))
[docs] def fallback_nodes_names(self): """Compute names for all nodes that are started by fallbacks. :return: names of nodes that are started by fallbacks in all failures """ ret = [] failure_node = self.last_allocated while failure_node: if isinstance(failure_node.fallback, list): ret.extend(failure_node.fallback) failure_node = failure_node.failure_link return ret
[docs] def waiting_nodes_names(self): """Compute all nodes that have defined fallbacks. :return: names of all nodes that we are expecting to fail for fallbacks """ return list(self.starting_nodes.keys())
[docs] def dump_all_conditions2stream(self, stream): """Dump all condition sources that are present to a stream. :param stream: output stream to dump to """ fail_node = self.last_allocated while fail_node: for idx, predicate in enumerate(fail_node.predicates): condition_name = FailureNode.construct_condition_name(fail_node, idx) condition_source = predicate.to_source() stream.write('def {}(db, node_args):\n'.format(condition_name)) stream.write(' return {}\n\n\n'.format(condition_source)) fail_node = fail_node.failure_link
[docs] def dump2stream(self, stream): """Dump failures to the Python config file for Dispatcher. :param stream: output stream to dump to """ fail_node = self.last_allocated while fail_node: next_dict = {} for key, value in fail_node.next.items(): next_dict[key] = self.failure_node_name(self.flow.name, value) stream.write("%s = {'next': " % self.failure_node_name(self.flow.name, fail_node)) # print "next_dict" stream.write('{') printed = False for key, value in next_dict.items(): if printed: stream.write(", ") stream.write("'%s': %s" % (key, value)) printed = True stream.write('}, ') conditions = [] condition_strs = [] for idx, predicate in enumerate(fail_node.predicates): conditions.append(FailureNode.construct_condition_name(fail_node, idx)) condition_strs.append(str(predicate).replace('\'', '\\\'')) # now list of nodes that should be started in case of failure (fallback) stream.write("'fallback': %s, 'propagate_failure': %s, 'conditions': %s, 'condition_strs': %s}\n" % (fail_node.fallbacks, fail_node.propagate_failures, str(conditions).replace("'", ""), condition_strs)) fail_node = fail_node.failure_link stream.write("\n%s = {" % self.starting_nodes_name(self.flow.name)) printed = False for key, value in self.starting_nodes.items(): if printed: stream.write(",") stream.write("\n '%s': %s" % (key, self.failure_node_name(self.flow.name, value))) printed = True stream.write("\n}\n\n")