Source code for selinon.storages.postgresql.adapter
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Selinon SQL Database adapter - PostgreSQL."""
import os
from selinon.data_storage import SelinonMissingDataException
from selinon import DataStorage
try:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy_utils import create_database
from sqlalchemy_utils import database_exists
except ImportError as exc:
raise ImportError("Please install dependencies using `pip3 install selinon[postgresql]`") from exc
from .models import Result
[docs]class PostgreSQL(DataStorage):
"""Selinon SQL Database adapter - PostgreSQL."""
def __init__(self, connection_string, encoding='utf-8', echo=False):
"""Initialize PostgreSQL adapter from YAML configuration file.
:param connection_string: connection string to be used to connect to PostgreSQL to
:param encoding: encoding to be used
:param echo: perform echo on queries
"""
super().__init__()
self.engine = create_engine(connection_string.format(**os.environ), encoding=encoding, echo=echo)
self.session = None
[docs] def is_connected(self): # noqa
return self.session is not None
[docs] def connect(self): # noqa
if not database_exists(self.engine.url):
create_database(self.engine.url)
self.session = sessionmaker(bind=self.engine)()
Result.metadata.create_all(self.engine)
[docs] def disconnect(self): # noqa
if self.is_connected():
self.session.close()
self.session = None
[docs] def retrieve(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
record = self.session.query(Result).filter_by(task_id=task_id).one()
assert record.task_name == task_name # nosec
return record.result
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
assert self.is_connected() # nosec
record = Result(node_args, flow_name, task_name, task_id, result)
try:
self.session.add(record)
self.session.commit()
except Exception:
self.session.rollback()
raise
return record.id
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
response = self.session.query(Result).filter_by(task_id=task_id).delete()
if response == 0:
raise SelinonMissingDataException("Record not found")