Source code for selinonlib.storages.s3
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# ######################################################################
# Copyright (C) 2016-2017 Fridolin Pokorny, fridolin.pokorny@gmail.com
# This file is part of Selinon project.
# ######################################################################
"""Selinon adapter for Amazon S3 storage."""
try:
import boto3
import botocore
except ImportError as exc:
raise ImportError("Please install boto3 using `pip3 install selinonlib[s3]` in order to use S3 storage") from exc
from selinon import DataStorage
[docs]class S3(DataStorage):
"""Amazon S3 storage adapter.
For credentials configuration see boto3 library configuration
https://github.com/boto/boto3
"""
def __init__(self, bucket, location, endpoint_url=None, use_ssl=None,
aws_access_key_id=None, aws_secret_access_key=None, region_name=None):
"""Initialize S3 storage adapter from YAML configuration file.
:param bucket: bucket name to be used
:param location: AWS location
:param endpoint_url: S3 endpoint (if local instance)
:param use_ssl: True if SSL should be used
:param aws_access_key_id: AWS access key
:param aws_secret_access_key: AWS secret access key
:param region_name: region to be used
"""
# AWS access key and access id are handled by Boto - place them to config or use env variables
super().__init__()
self._bucket_name = bucket
self._location = location
self._s3 = None
self._use_ssl = use_ssl
self._endpoint_url = endpoint_url
self._session = boto3.session.Session(aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name)
[docs] def is_connected(self): # noqa
return self._s3 is not None
[docs] def connect(self): # noqa
# we need signature version v4 as new AWS regions use this version and we won't be able to connect without this
self._s3 = self._session.resource('s3', config=botocore.client.Config(signature_version='s3v4'),
use_ssl=self._use_ssl, endpoint_url=self._endpoint_url)
# check that the bucket exists - see boto docs
try:
self._s3.meta.client.head_bucket(Bucket=self._bucket_name)
except botocore.exceptions.ClientError as exc:
# if a client error is thrown, then check that it was a 404 error.
# if it was a 404 error, then the bucket does not exist.
error_code = int(exc.response['Error']['Code'])
if error_code == 404:
self._s3.create_bucket(Bucket=self._bucket_name,
CreateBucketConfiguration={
'LocationConstraint': self._location
})
else:
raise
[docs] def disconnect(self): # noqa
if self._s3:
del self._s3
self._s3 = None
[docs] def retrieve(self, flow_name, task_name, task_id): # noqa
assert self.is_connected() # nosec
return self._s3.Object(self._bucket_name, task_id).get()['Body'].read()
[docs] def store(self, node_args, flow_name, task_name, task_id, result): # noqa
assert self.is_connected() # nosec
self._s3.Object(self._bucket_name, task_id).put(Body=result)
[docs] def store_error(self, node_args, flow_name, task_name, task_id, exc_info): # noqa
# just to make pylint happy
raise NotImplementedError()