Source code for selinon.executor.queue_pool
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Pool of all queues in the system."""
from .time_queue import TimeQueue
[docs]class QueuePool:
"""Pool of all queues in the system."""
class _QueueWrapper:
"""Wrap a queue so we carry additional info needed for QueuePool - cyclic double linked list."""
def __init__(self, previous_wrapper, next_wrapper, queue_name):
"""Init QueueWrapper.
:param previous_wrapper: preceding queue wrapper in the linked list
:param next_wrapper: next queue wrapper in the linked list
:param queue_name: name of the queue that is wrapped
"""
self.queue_name = queue_name
self.queue = TimeQueue()
self.previous = previous_wrapper or self
self.next = next_wrapper or self
def __next__(self): # noqa
return self.next
def __repr__(self):
"""Queue representation for nice logs.
:return: queue representation
"""
return "%s(queue='%s')" % (self.__class__.__name__, self.queue_name)
def __init__(self):
"""Initialize pool of queues."""
# Queues are instantiated lazily on demand.
self._queues = {}
self._last_used = None
self._queue_head = None
self._queue_tail = None
def _create_queue_wrapper(self, queue_name):
"""Create queue wrapper for the given queue name and register it to our queue pool (cyclic double-linked list).
:param queue_name: queue name that the wrapper represents
:return queue wrapper
"""
queue_wrapper = self._QueueWrapper(self._queue_tail, self._queue_head, queue_name)
if self._queue_tail is not None:
self._queue_tail.next = queue_wrapper
self._queue_tail = queue_wrapper
if self._queue_head is None:
self._queue_head = queue_wrapper
self._queue_head.previous = queue_wrapper
return queue_wrapper
def _remove_queue_wrapper(self, queue_wrapper):
"""Remove queue wrapper from queue pool - double linked list."""
self._queues.pop(queue_wrapper.queue_name)
if queue_wrapper is self._queue_head and queue_wrapper is self._queue_tail:
self._queue_head = None
self._queue_tail = None
self._last_used = None
return
queue_wrapper.previous.next = queue_wrapper.next
queue_wrapper.next.previous = queue_wrapper.previous
if queue_wrapper is self._queue_head:
self._queue_head = queue_wrapper.next
if queue_wrapper is self._queue_tail:
self._queue_tail = queue_wrapper.previous
if queue_wrapper is self._last_used:
self._last_used = queue_wrapper.previous
[docs] def get_queue(self, name):
"""Get queue wrapper by name of the queue that is wrapped, if does not exist, create one lazily.
:param name: a name of the queue
:return: queue wrapper for the given queue with requested name
"""
queue_wrapper = self._queues.get(name)
if queue_wrapper is None:
queue_wrapper = self._create_queue_wrapper(name)
self._queues[name] = queue_wrapper
return queue_wrapper
[docs] def queue_exists(self, name):
"""Check whether a queue with the given name exists.
:param name: name of queue to be checked if exists
:return: True if such queue exists in queue pool
"""
return name in self._queues
[docs] def push(self, queue_name, time, record):
"""Push record with its time to queue with name queue_name.
:param queue_name: a queue name that should keep record
:param time: time of record (when should be record executed)
:param record: record itself (message with additional information such as task name and its parameters)
"""
# TODO: add lock per queue
queue_wrapper = self.get_queue(queue_name)
queue_wrapper.queue.push(time, record)
# initialize head if we haven't pushed anything to queue yet
# so we start from the very first queue
if self._last_used is None:
self._last_used = queue_wrapper
[docs] def pop(self):
"""Pop a record with the smallest time.
:return: (time, record) tuple - time of record and record itself (see self.push for more info)
"""
# TODO: add lock per queue
assert self._last_used is not None # nosec
exit_loop = False
result_time, result_record = self._last_used.queue.top()
result_queue_wrapper = self._last_used
queue_wrapper = next(self._last_used)
while not exit_loop:
candidate_time, candidate_record = queue_wrapper.queue.top()
if result_time is None or candidate_time < result_time:
result_time, result_record = candidate_time, candidate_record
result_queue_wrapper = queue_wrapper
queue_wrapper = next(queue_wrapper)
if queue_wrapper == self._last_used:
exit_loop = True
result_queue_wrapper.queue.pop()
if result_queue_wrapper.queue.is_empty():
self._remove_queue_wrapper(result_queue_wrapper)
else:
self._last_used = result_queue_wrapper
return result_time, result_record
[docs] def is_empty(self):
"""Check pool emptiness.
:return: True if queue pool does not have any messages and no queues (as queues get deleted when empty)
"""
return len(self._queues) == 0
def __repr__(self):
"""Queue pool representation.
:return: a string representing pool (with a list of active queues)
"""
return "%s(%s)" % (self.__class__.__name__, self._queues)