Source code for selinon.storages.mongodb
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""MongoDB database adapter."""
import os
from selinon.data_storage import SelinonMissingDataException
try:
from pymongo import MongoClient
except ImportError as exc:
raise ImportError("Please install dependencies using `pip3 install selinon[mongodb]` "
"in order to use MongoStorage") from exc
from selinon import DataStorage
[docs]class MongoDB(DataStorage):
"""MongoDB database adapter."""
def __init__(self, db_name, collection_name, host=None, port=27017):
"""Instantiate MongoDB storage adapter.
:param db_name: MongoDB database name
:param collection_name: MongoDB collection name
:param host: MongoDB host
:param port: MongoDB port
"""
super().__init__()
self.client = None
self.collection = None
self.db = None # pylint: disable=invalid-name
self.host = (host or 'localhost').format(**os.environ)
self.port = int(port.format(**os.environ) if isinstance(port, str) else port)
self.db_name = db_name.format(**os.environ)
self.collection_name = collection_name.format(**os.environ)
[docs] def is_connected(self): # noqa
return self.client is not None
[docs] def connect(self): # noqa
self.client = MongoClient(self.host, self.port)
self.db = self.client[self.db_name]
self.collection = self.db[self.collection_name]
[docs] def disconnect(self): # noqa
if self.is_connected():
self.client.close()
self.client = None
self.db = None
self.collection = None
[docs] def retrieve(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
filtering = {'_id': 0}
cursor = self.collection.find({'task_id': task_id}, filtering)
if cursor.count() > 1:
raise ValueError("Multiple records with same task_id found")
if not cursor:
raise FileNotFoundError("Record not found in database")
record = cursor[0]
assert task_name == record['task_name'] # nosec
return record.get('result')
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
assert self.is_connected() # nosec
record = {
'flow_name': flow_name,
'node_args': node_args,
'task_name': task_name,
'task_id': task_id,
'result': result
}
self.collection.insert(record)
# task_id is unique here
return task_id
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
filtering = {'_id': 0}
cursor = self.collection.find({'task_id': task_id}, filtering)
cursor_count = cursor.count()
if cursor_count > 1:
raise ValueError("Multiple records with same task_id found")
if cursor_count == 0:
raise SelinonMissingDataException("Record not found in database")
self.collection.delete_one({'task_id': task_id})