Source code for selinon.storages.filesystem
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""A simple filesystem storage implementation."""
import json
import os
from selinon import DataStorage, SelinonMissingDataException
[docs]class Filesystem(DataStorage):
"""Selinon adapter for storing task results in a directory."""
def __init__(self, path=None):
"""Instantiate Filesystem adapter.
:param path: path to directory to be used
:type path: str
"""
super().__init__()
self.path = (path or '{PWD}').format(**os.environ)
self._connected = False
def _construct_base_path(self, flow_name, task_name):
return os.path.join(self.path, flow_name, task_name)
def _construct_path(self, flow_name, task_name, task_id):
return os.path.join(self._construct_base_path(flow_name, task_name), '{}.json'.format(task_id))
[docs] def is_connected(self):
return self._connected
[docs] def connect(self):
if not os.path.isdir(self.path):
os.makedirs(self.path, exist_ok=True)
self._connected = True
[docs] def disconnect(self):
pass
[docs] def retrieve(self, flow_name, task_name, task_id):
path = self._construct_path(flow_name, task_name, task_id)
with open(path, 'r') as result_file:
return json.load(result_file)
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
base_path = self._construct_base_path(flow_name, task_name)
if not os.path.isdir(base_path):
os.makedirs(base_path, exist_ok=True)
path = self._construct_path(flow_name, task_name, task_id)
with open(path, 'w') as result_file:
json.dump(result, result_file)
return path
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id):
path = self._construct_path(flow_name, task_name, task_id)
if os.path.exists(path):
os.remove(path)
else:
raise SelinonMissingDataException