Source code for selinon.storages.s3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2018 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Selinon adapter for Amazon S3 storage."""
import json
import os
try:
import boto3
import botocore
except ImportError as exc:
raise ImportError("Please install boto3 using `pip3 install selinon[s3]` in order to use S3 storage") from exc
from selinon import DataStorage, SelinonMissingDataException
[docs]class S3(DataStorage):
"""Amazon S3 storage adapter.
For credentials configuration see boto3 library configuration
https://github.com/boto/boto3
"""
def __init__(self, bucket, location=None, endpoint_url=None, use_ssl=None,
aws_access_key_id=None, aws_secret_access_key=None, region_name=None, serialize_json=False):
"""Initialize S3 storage adapter from YAML configuration file.
:param bucket: bucket name to be used
:param location: AWS location
:param endpoint_url: S3 endpoint (if local instance)
:param use_ssl: True if SSL should be used
:param aws_access_key_id: AWS access key
:param aws_secret_access_key: AWS secret access key
:param region_name: region to be used
:param serialize_json: serialize JSON output (dict or list) to a blob - needed as S3 objects are blobs
"""
# AWS access key and access id are handled by Boto - place them to config or use env variables
super().__init__()
self._bucket_name = bucket.format(**os.environ)
self._location = location.format(**os.environ) if location else None
self._s3 = None
self._use_ssl = bool(use_ssl.format(**os.environ) if isinstance(use_ssl, str) else use_ssl)
self._endpoint_url = endpoint_url.format(**os.environ) if endpoint_url else None
self._serialize_json = serialize_json
aws_access_key_id = aws_access_key_id.format(**os.environ) if aws_access_key_id else None
aws_secret_access_key = aws_secret_access_key.format(**os.environ) if aws_secret_access_key else None
region_name = region_name.format(**os.environ) if region_name else None
self._session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
[docs] def is_connected(self): # noqa
return self._s3 is not None
[docs] def connect(self): # noqa
# we need signature version v4 as new AWS regions use this version and we won't be able to connect without this
self._s3 = self._session.resource('s3', config=botocore.client.Config(signature_version='s3v4'),
use_ssl=self._use_ssl, endpoint_url=self._endpoint_url)
# check that the bucket exists - see boto docs
try:
self._s3.meta.client.head_bucket(Bucket=self._bucket_name)
except botocore.exceptions.ClientError as exc:
# if a client error is thrown, then check that it was a 404 error.
# if it was a 404 error, then the bucket does not exist.
error_code = int(exc.response['Error']['Code'])
if error_code == 404:
self._s3.create_bucket(Bucket=self._bucket_name,
CreateBucketConfiguration={
'LocationConstraint': self._location
})
else:
raise
[docs] def disconnect(self): # noqa
if self._s3:
del self._s3
self._s3 = None
[docs] def retrieve(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
blob = self._s3.Object(self._bucket_name, task_id).get()['Body'].read()
if not self._serialize_json:
return blob
return json.loads(blob.decode())
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
assert self.is_connected() # nosec
if self._serialize_json:
result = json.dumps(result).encode()
self._s3.Object(self._bucket_name, task_id).put(Body=result)
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()
[docs] def delete(self, flow_name, task_name, task_id):
assert self.is_connected() # nosec
s3_object = self._s3.Object(self._bucket_name, task_id)
try:
s3_object.load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
# The object does not exist.
raise SelinonMissingDataException from e
raise e
s3_object.delete()