#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Path traversal manipulation."""
from collections import deque
import copy
from itertools import chain
from .config import Config
from .errors import SelectiveNoPathError
def _get_all_subflows_dict(flow_name):
"""Get all subflows for the given flow name, the result is stored in a dict.
The resulting dict has keys that correspond to all transitive subflows for flow flow_name and values for a subflow
is a list of flows that are directly parent for corresponding subflow (key).
E.g.:
flow1:
subflows: flow2, flow3
flow2:
subflows: flow1, flow3, flow4
The resulting dict for _get_all_subflows_dict('flow1') will be:
{
"flow2": ["flow1"],
"flow3": ["flow1", "flow2"],
"flow4": ["flow2"],
"flow1": ["flow2"],
}
Read as: I can get directly to flow <key> from flows <values>.
:param flow_name: a flow name for which all subflows should be computed
:return: dict representing all subflows and direct paths for the given subflow
"""
stack = deque()
result = {}
stack.append(flow_name)
while stack:
inspected_flow_name = stack.pop()
for edge in Config.edge_table[inspected_flow_name]:
for node_name in edge['to']:
if Config.is_flow(node_name):
if node_name not in result:
result[node_name] = set()
stack.append(node_name)
result[node_name] |= {inspected_flow_name}
for key, value in result.items():
result[key] = list(value)
return result
def _normalize_path(paths):
"""Normalize multiple graph traversals by edges into one that traverses all edges.
In general we can get to a task by multiple paths. As we would like to ensure that all paths are traversed, we
compound multiple traversals into a single one that traverses all necessary edges.
:param paths: a list of paths that should be compound into a single traversal
:return: a dict representing compound traversal
"""
result = {}
for entry in paths:
for key, value in entry.items():
if key not in result:
result[key] = []
result[key] = list(set(result[key]) | set(value))
return result
def _compute_paths(flow_name, task_name):
"""Compute all paths in a flow to a node.
:param flow_name: name of flow that should be traversed
:param task_name: a name of node that should be visited
:return: a list of paths that lead to the node that should be visited
"""
result = []
stack = deque()
stack.append(({task_name}, set(), {}))
while stack:
to_expand, visited, expanded = stack.pop()
for name in to_expand:
for edge_idx, edge in enumerate(Config.edge_table[flow_name]):
if name in edge['to'] and (edge_idx not in expanded.keys() or name not in expanded[edge_idx]):
new_expanded = copy.copy(expanded)
if edge_idx not in new_expanded:
new_expanded[edge_idx] = [name]
else:
new_expanded[edge_idx].append(name)
new_visited = visited | {name}
new_to_expand = to_expand | set(edge['from']) - {name}
if new_visited == new_to_expand:
result.append(new_expanded)
else:
stack.append((new_to_expand, new_visited, new_expanded))
return _normalize_path(result)
def _raise_for_result_check(task_names, path):
"""Check that all nodes stated in task_names are present in path - i.e. they are visited during traversing.
:param task_names: a list of node names that should be checked
:param path: traverses that are per-flow specific
:raises SelectiveNoPathError: there was not found any path to the given node
"""
visited_marking = dict.fromkeys(task_names, False)
for node in visited_marking:
for flow in path:
if any(n == node for n in chain(*path[flow].values())):
visited_marking[node] = True
for node, visited in visited_marking.items():
if not visited:
raise SelectiveNoPathError("No path to node '%s' found" % node)
def _compute_subsequent_edges(flow_name, node_names):
"""Compute nodes that are subsequent nodes based on node_names.
:param flow_name: name of the flow in which subsequent nodes should be found
:param node_names: a list of nodes that were run, note that they does not need to be necessarily stated in flow_name
:return: a list of tasks that follow after node_names execution
"""
result = []
desired_nodes = set(node_names)
change = True
while change:
change = False
for edge_idx, edge in enumerate(Config.edge_table[flow_name]):
if edge['from'] and edge_idx not in result and set(edge['from']).issubset(desired_nodes):
result.append(edge_idx)
desired_nodes |= set(edge['to'])
change = True
return result
def _compute_traversals(flow_name, task_names, follow_subflows=True):
"""Compute all traversals/paths to nodes from a flow.
:param flow_name: a name of flow to start traversing with
:param task_names: a list of nodes we want to visit/traverse
:param follow_subflows: if True, we also inspect transitively all subflows from flow_name
:return: a dict, where keys are flows that need to be traversed and values are traversals/paths to all task_names
"""
result = {}
stack = deque()
subflows_dict = {}
traversed_subflows = set()
if not isinstance(task_names, (list, tuple)):
task_names = [task_names]
for task_name in task_names:
stack.append((flow_name, task_name))
if follow_subflows:
subflows_dict = _get_all_subflows_dict(flow_name)
for subflow_name in subflows_dict.keys(): # pylint: disable=consider-iterating-dictionary
for task_name in task_names:
stack.append((subflow_name, task_name))
while stack:
flow, node = stack.pop()
paths = _compute_paths(flow, node)
if flow != flow_name and paths:
for parent_flow in subflows_dict[flow]:
if (parent_flow, flow) not in traversed_subflows:
traversed_subflows |= {(parent_flow, flow)}
stack.append((parent_flow, flow))
if flow in result:
result[flow] = _normalize_path((result[flow], paths))
else:
result[flow] = paths
_raise_for_result_check(task_names, result)
return result
[docs]def compute_selective_run(flow_name, task_names, follow_subflows=False, run_subsequent=False):
"""Compute selective run for a flow.
:param flow_name: a name of the flow that should be run
:param task_names: a list of tasks that should be run
:param follow_subflows: apply selective run to all subflows (transitively)
:param run_subsequent: run tasks that depend on task_names
:return: computed selective run dictionary
"""
traversals = _compute_traversals(flow_name, task_names, follow_subflows)
result = {
'task_names': task_names,
'waiting_edges_subset': traversals
}
if run_subsequent:
if isinstance(run_subsequent, (list, tuple)):
subsequent_flows = run_subsequent
else:
subsequent_flows = traversals.keys()
for flow in subsequent_flows:
subsequent_edges = _compute_subsequent_edges(flow, task_names)
for idx in subsequent_edges:
# We need to make sure that we start all nodes so we have fire edge when we visit it twice due to cycles
#
# T1 <-
# / | |
# / | |
# T3 T2 --
#
# T2 should be run, if we get minimal path to T2, there will be no T3, but since we have edge T2->T1, we
# need to start T3 as subsequent.
#
traversals[flow][idx] = Config.edge_table[flow][idx]['to']
return result