Source code for selinon.trace

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018  Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
# pylint: disable=line-too-long
"""Built-in tracing mechanism.

A list of events that can be traced:


+----------------------------+-------------------------------------+-----------------+------------------------------------+
| Event                      |  Event description                  | Emitter         |  msg_dict.keys()                   |
+============================+=====================================+=================+====================================+
|                            | A flow has been scheduled.          |                 |  dispatcher_id, selective,         |
|   `FLOW_SCHEDULE`          |                                     | Msg producer    |  queue, node_args,                 |
|                            |                                     |                 |  flow_name                         |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Dispatcher was started and will     |                 |  dispatcher_id, state, selective,  |
|   `DISPATCHER_WAKEUP`      | check flow status.                  | Dispatcher      |  retry, queue, node_args,          |
|                            |                                     |                 |  flow_name                         |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | A new flow is starting, this event  |                 |  dispatcher_id, selective,         |
|   `FLOW_START`             | is emitted on after                 | Dispatcher      |  queue, node_args, flow_name       |
|                            | `DISPATCHER_WAKEUP`.                |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Emitted when a new task is          |                 |  countdown, condition_str,         |
|   `TASK_SCHEDULE`          | scheduled by dispatcher.            | Dispatcher      |  task_name, queue,                 |
|                            |                                     |                 |  node_args, dispatcher_id, parent, |
|                            |                                     |                 |  task_id, selective, foreach_str,  |
|                            |                                     |                 |  flow_name, selective_edge         |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Emitted when a task is going to     |                 |                                    |
|   `TASK_START`             | be executed on worker side.         | Task            |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Emitted when a flow is scheduled    |                 |                                    |
|   `SUBFLOW_SCHEDULE`       | by dispatcher.                      | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizing end of task execution,  |                 |                                    |
|   `TASK_END`               | task finished successfully.         | Task            |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizing that a node in flow     |                 |                                    |
|   `NODE_SUCCESSFUL`        | graph (task or flow) successfully   | Dispatcher      |                                    |
|                            | finished.                           |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizing that a task returned a  |                 |                                    |
|   `TASK_DISCARD_RESULT`    | value other than None, but no       | Task            |                                    |
|                            | storage was assigned to task to     |                 |                                    |
|                            | store result.                       |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizing end of task, task       |                 |                                    |
|   `TASK_FAILURE`           | raised an exception, marking task   | Task            |                                    |
|                            | as failed.                          |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizing that a task failed by,  |                 |                                    |
|   `TASK_RETRY`             | raising an exception and will be    | Task            |                                    |
|                            | retried.                            |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalization of flow failure,      |                 |                                    |
|   `FLOW_FAILURE`           | one more nodes in flow graph failed | Dispatcher      |                                    |
|                            | without successful fallback run.    |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | This event shouldn't be normally    |                 |                                    |
|   `DISPATCHER_FAILURE`     | seen - signalizing error in         | Dispatcher      |                                    |
|                            | Selinon.                            |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalization of captured failure   |                 |                                    |
|   `NODE_FAILURE`           | of node in task flow graph - flow   | Dispatcher      |                                    |
|                            | or task failure.                    |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Fallback for node failures in flow  |                 |                                    |
|   `FALLBACK_START`         | is started to handle node failures. | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Dispatcher finished scheduling new  |                 | dispatcher_id, state, selective,   |
|   `DISPATCHER_RETRY`       | nodes and will retry to check flow  | Dispatcher      | retry, queue, node_args,           |
|                            | status after a while.               |                 | state_dict, flow_name, parent      |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Flow has successfully ended.        |                 |                                    |
|   `FLOW_END`               |                                     | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Flow has migration was done.        |                 |                                    |
|   `MIGRATION`              |                                     | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalization of tainted flow when  |                 |                                    |
|   `MIGRATION_TAINTED_FLOW` | migration was run.                  | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Flow migration has error - skewed   |                 |                                    |
|   `MIGRATION_SKEW`         | migration.                          | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Given node did not connect to       |                 | storage_name                       |
|   `STORAGE_CONNECT`        | storage yet so a new connection is  | Dispatcher/Task |                                    |
|                            | requested by calling                |                 |                                    |
|                            | DataStorage.connect()               |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Called when disconnecting from      |                 |                                    |
|   `STORAGE_DISCONNECT`     | storage by calling                  | storage adapter |                                    |
|                            | DataStorage.disconnect()            | destructor      |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Retrieve task result from storage   |                 |                                    |
|   `STORAGE_RETRIEVE`       | that was assigned to the task.      | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested result of task was        |                 |                                    |
|   `STORAGE_RETRIEVED`      | retrieved.                          | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Store result of task in the         |                 |                                    |
|   `STORAGE_STORE`          | assigned storage.                   | Task            |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | The result of task has been stored  |                 |                                    |
|   `STORAGE_STORED`         | in the assigned storage.            | Task            |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | The condition on edge was evaluated |                 |                                    |
|   `EDGE_COND_FALSE`        | as false so destination nodes will  | Dispatcher      |                                    |
|                            | not be scheduled.                   |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Reports result of foreach function, |                 |                                    |
|   `FOREACH_RESULT`         | based on which N nodes will be      | Dispatcher      |                                    |
|                            | scheduled (N is runtime variable)   |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested state of the node (if the |                 |                                    |
|   `NODE_STATE_CACHE_GET`   | node was successful or failed) from | Dispatcher      |                                    |
|                            | cache.                              |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested entry in the state cache  |                 |                                    |
|   `NODE_STATE_CACHE_ADD`   | was not found and the node          | Dispatcher      |                                    |
|                            | succeeded so new entry to state     |                 |                                    |
|                            | is added.                           |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested entry was not found in    |                 |                                    |
|   `NODE_STATE_CACHE_MISS`  | state cache.                        | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested entry was found in the    |                 |                                    |
|   `NODE_STATE_CACHE_HIT`   | state cache and will be used.       | Dispatcher      |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Try to hit result cache for cached  |                 |                                    |
|   `TASK_RESULT_CACHE_GET`  | already requested task result.      | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Add to result cache result of a     |                 |                                    |
|   `TASK_RESULT_CACHE_ADD`  | task that was requested.            | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested task result was not found |                 |                                    |
|   `TASK_RESULT_CACHE_MISS` | in the result cache.                | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Requested task result was found     |                 |                                    |
|   `TASK_RESULT_CACHE_HIT`  | in the result cache.                | Dispatcher/Task |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Given edge will not be fired        |                 |                                    |
|   `SELECTIVE_OMIT_EDGE`    | as it is not part of direct path    | Dispatcher      |                                    |
|                            | to requested tasks on selective run.|                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | The desired node will not be        |                 |                                    |
|   `SELECTIVE_OMIT_NODE`    | scheduled as it is not requested    | Dispatcher      |                                    |
|                            | task nor dependency of requested    |                 |                                    |
|                            | task in selective task runs.        |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Keeps track of results of selective |                 | dispatcher_id, parent, selective,  |
|   `SELECTIVE_RUN_FUNC`     | run function on selective task      | Dispatcher      | node_args, flow_name,              |
|                            | runs.                               |                 | result, node_name                  |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizes reuse of already         |                 |                                    |
|   `SELECTIVE_TASK_REUSE`   | computed results in selective task  | Dispatcher      |                                    |
|                            | runs.                               |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizes storing error information|                 |                                    |
|   `STORAGE_STORE_ERROR`    | in assigned storage on task failure.| Task            |                                    |
|                            |                                     |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+
|                            | Signalizes that storing error       |                 |                                    |
| `STORAGE_OMIT_STORE_ERROR` | will not be done - missing storage  | Task            |                                    |
|                            | adapter or `store_error()` is not   |                 |                                    |
|                            | implemented.                        |                 |                                    |
+----------------------------+-------------------------------------+-----------------+------------------------------------+

"""

import datetime
import json
import logging
import os
import platform
import sys

_LOGGER = logging.getLogger(__name__)


[docs]class Trace: """Trace system flow actions.""" _trace_functions = [] _logger = None DISPATCHER_WAKEUP, \ FLOW_SCHEDULE, \ FLOW_START, \ TASK_SCHEDULE, \ TASK_START, \ SUBFLOW_SCHEDULE, \ TASK_END, \ NODE_SUCCESSFUL, \ TASK_DISCARD_RESULT, \ TASK_FAILURE, \ TASK_RETRY, \ FLOW_FAILURE, \ DISPATCHER_FAILURE, \ NODE_FAILURE, \ FALLBACK_START, \ DISPATCHER_RETRY, \ FLOW_END, \ STORAGE_CONNECT, \ STORAGE_DISCONNECT, \ STORAGE_RETRIEVE, \ STORAGE_RETRIEVED, \ STORAGE_STORE, \ STORAGE_STORED, \ EDGE_COND_FALSE, \ FOREACH_RESULT,\ NODE_STATE_CACHE_GET,\ NODE_STATE_CACHE_ADD,\ NODE_STATE_CACHE_MISS,\ NODE_STATE_CACHE_HIT,\ TASK_RESULT_CACHE_GET,\ TASK_RESULT_CACHE_ADD,\ TASK_RESULT_CACHE_MISS,\ TASK_RESULT_CACHE_HIT,\ SELECTIVE_OMIT_EDGE,\ SELECTIVE_OMIT_NODE,\ SELECTIVE_RUN_FUNC,\ SELECTIVE_TASK_REUSE,\ STORAGE_STORE_ERROR,\ STORAGE_OMIT_STORE_ERROR,\ FALLBACK_COND_FALSE,\ FALLBACK_COND_TRUE,\ NODE_STATE_CACHE_ISSUE,\ TASK_RESULT_CACHE_ISSUE,\ STORAGE_ISSUE,\ RESULT_BACKEND_ISSUE,\ FLOW_RETRY,\ MIGRATION,\ MIGRATION_SKEW,\ MIGRATION_TAINTED_FLOW, \ MIGRATION_ERROR, \ EAGER_FAILURE,\ STORAGE_DELETE, \ STORAGE_DELETED, \ = range(53) WARN_EVENTS = ( NODE_FAILURE, TASK_DISCARD_RESULT, TASK_RETRY, TASK_FAILURE, FLOW_FAILURE, STORAGE_OMIT_STORE_ERROR, NODE_STATE_CACHE_ISSUE, TASK_RESULT_CACHE_ISSUE, STORAGE_ISSUE, RESULT_BACKEND_ISSUE, FLOW_RETRY, MIGRATION_SKEW, MIGRATION_ERROR, EAGER_FAILURE, ) _event_strings = ( 'DISPATCHER_WAKEUP', 'FLOW_SCHEDULE', 'FLOW_START', 'TASK_SCHEDULE', 'TASK_START', 'SUBFLOW_SCHEDULE', 'TASK_END', 'NODE_SUCCESSFUL', 'TASK_DISCARD_RESULT', 'TASK_FAILURE', 'TASK_RETRY', 'FLOW_FAILURE', 'DISPATCHER_FAILURE', 'NODE_FAILURE', 'FALLBACK_START', 'DISPATCHER_RETRY', 'FLOW_END', 'STORAGE_CONNECT', 'STORAGE_DISCONNECT', 'STORAGE_RETRIEVE', 'STORAGE_RETRIEVED', 'STORAGE_STORE', 'STORAGE_STORED', 'EDGE_COND_FALSE', 'FOREACH_RESULT', 'NODE_STATE_CACHE_GET', 'NODE_STATE_CACHE_ADD', 'NODE_STATE_CACHE_MISS', 'NODE_STATE_CACHE_HIT', 'TASK_RESULT_CACHE_GET', 'TASK_RESULT_CACHE_ADD', 'TASK_RESULT_CACHE_MISS', 'TASK_RESULT_CACHE_HIT', 'SELECTIVE_OMIT_EDGE', 'SELECTIVE_OMIT_NODE', 'SELECTIVE_RUN_FUNC', 'SELECTIVE_TASK_REUSE', 'STORAGE_STORE_ERROR', 'STORAGE_OMIT_STORE_ERROR', 'FALLBACK_COND_FALSE', 'FALLBACK_COND_TRUE', 'NODE_STATE_CACHE_ISSUE', 'TASK_RESULT_CACHE_ISSUE', 'STORAGE_ISSUE', 'RESULT_BACKEND_ISSUE', 'FLOW_RETRY', 'MIGRATION', 'MIGRATION_SKEW', 'MIGRATION_TAINTED_FLOW', 'MIGRATION_ERROR', 'EAGER_FAILURE', 'STORAGE_DELETE', 'STORAGE_DELETED' ) def __init__(self): """Unused.""" raise NotImplementedError()
[docs] @classmethod def trace_by_logging(cls, logger=None): """Trace by using Python's logging. :param logger: optional logger that should be used """ if not logger and not cls._logger: logger = logging.getLogger(__name__) cls._logger = logger cls._trace_functions.append(cls.logging_trace_func)
[docs] @classmethod def trace_by_json(cls): """Trace by writing directly JSON trace points.""" cls._trace_functions.append(cls.json_trace_func)
[docs] @classmethod def trace_by_sentry(cls, dsn): """Trace using Sentry (see https://sentry.io). :param dsn: data source name for connecting to Sentry """ try: import sentry_sdk except ImportError as exc: raise ImportError("Failed to import Sentry-SDK for Sentry logging, install it using `pip3 install sentry-sdk`")\ from exc sentry_sdk.init(dsn.format(**os.environ))
[docs] @classmethod def trace_by_func(cls, func): """Trace by a custom function. :param func: function with a one single argument """ cls._trace_functions.append(func)
[docs] @classmethod def log(cls, event, *msg_dict, **msg_dict_kwargs): """Log an event. :param event: tracing event :param msg_dict: message to be printed :param msg_dict_kwargs: kwargs like dictionary for traced details """ if not cls._trace_functions: return to_report = {} for msg in msg_dict: to_report.update(msg) to_report.update(msg_dict_kwargs) for trace_func in cls._trace_functions: try: trace_func(event, to_report) except Exception as exc: # pylint: disable=broad-except # To prevent from recursively reporting errors, this error is reported only to standard logging. _LOGGER.exception("Failed to report to trace function %r: %s", trace_func, str(exc))
[docs] @classmethod def event2str(cls, event): """Translate event to it's string representation. :param event: event :return: string representation of event """ return cls._event_strings[event]
[docs] @classmethod def logging_trace_func(cls, event, msg_dict, logger=None): """Trace to Python's logging facilities. :param event: event that triggered trace point :param msg_dict: a dict holding additional trace information for event :param logger: a logger to be used, if None, logger from trace_by_logging will be used """ report_dict = { 'event': cls.event2str(event), 'time': str(datetime.datetime.utcnow()), 'details': msg_dict } message = "SELINON %10s - %s : %s" \ % (platform.node(), cls.event2str(event), json.dumps(report_dict, sort_keys=True)) logger = logger or cls._logger if event == Trace.DISPATCHER_FAILURE: logger.error(message) elif event in cls.WARN_EVENTS: logger.warning(message) else: logger.info(message)
[docs] @classmethod def json_trace_func(cls, event, msg_dict): """Trace by directly printing JSONs to stdout or stderr. :param event: event that triggered trace point :param msg_dict: a dict holding additional trace information for event """ report_dict = { 'event': cls.event2str(event), 'time': str(datetime.datetime.utcnow()), 'details': msg_dict, 'node': platform.node() } # We could use simplejson here so this will be faster if event in cls.WARN_EVENTS: print(json.dumps(report_dict, sort_keys=True), file=sys.stderr) else: print(json.dumps(report_dict, sort_keys=True))