blob: 3ddc5205713892b3df55e147828a6f28d8f50ebd [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
File system based RAPI
"""
import os
import shutil
from multiprocessing import RLock
from contextlib import contextmanager
from functools import partial
from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
from aria.storage import (
api,
exceptions
)
class FileSystemResourceAPI(api.ResourceAPI):
"""
File system resource storage.
"""
def __init__(self, directory, **kwargs):
"""
File system implementation for storage api.
:param str directory: root dir for storage.
"""
super(FileSystemResourceAPI, self).__init__(**kwargs)
self.directory = directory
self.base_path = os.path.join(self.directory, self.name)
self._join_path = partial(os.path.join, self.base_path)
self._lock = RLock()
@contextmanager
def connect(self):
"""
Established a connection and destroys it after use.
:return:
"""
try:
self._establish_connection()
yield self
except BaseException as e:
raise exceptions.StorageError(str(e))
finally:
self._destroy_connection()
def _establish_connection(self):
"""
Establish a conenction. used in the 'connect' contextmanager.
:return:
"""
self._lock.acquire()
def _destroy_connection(self):
"""
Destroy a connection. used in the 'connect' contextmanager.
:return:
"""
self._lock.release()
def __repr__(self):
return '{cls.__name__}(directory={self.directory})'.format(
cls=self.__class__, self=self)
def create(self, **kwargs):
"""
Create directory in storage by path.
tries to create the root directory as well.
:param str name: path of file in storage.
"""
try:
os.makedirs(self.directory)
except (OSError, IOError):
pass
try:
os.makedirs(self.base_path)
except (OSError, IOError):
pass
def read(self, entry_id, path, **_):
"""
Retrieve the content of a file system storage resource.
:param str entry_id: the id of the entry.
:param str path: a path to the specific resource to read.
:return: the content of the file.
:rtype: bytes
"""
resource_relative_path = os.path.join(self.name, entry_id, path or '')
resource = os.path.join(self.directory, resource_relative_path)
if not os.path.exists(resource):
raise exceptions.StorageError("Resource {0} does not exist".
format(resource_relative_path))
if not os.path.isfile(resource):
resources = os.listdir(resource)
if len(resources) != 1:
raise exceptions.StorageError(
'Failed to read {0}; Reading a directory is '
'only allowed when it contains a single resource'.format(resource))
resource = os.path.join(resource, resources[0])
with open(resource, 'rb') as resource_file:
return resource_file.read()
def download(self, entry_id, destination, path=None, **_):
"""
Download a specific file or dir from the file system resource storage.
:param str entry_id: the id of the entry.
:param str destination: the destination to download to
:param str path: the path to download relative to the root of the entry (otherwise all).
"""
resource_relative_path = os.path.join(self.name, entry_id, path or '')
resource = os.path.join(self.directory, resource_relative_path)
if not os.path.exists(resource):
raise exceptions.StorageError("Resource {0} does not exist".
format(resource_relative_path))
if os.path.isfile(resource):
shutil.copy2(resource, destination)
else:
dir_util.copy_tree(resource, destination) # pylint: disable=no-member
def upload(self, entry_id, source, path=None, **_):
"""
Uploads a specific file or dir to the file system resource storage.
:param str entry_id: the id of the entry.
:param source: the source of the files to upload.
:param path: the destination of the file/s relative to the entry root dir.
"""
resource_directory = os.path.join(self.directory, self.name, entry_id)
if not os.path.exists(resource_directory):
os.makedirs(resource_directory)
destination = os.path.join(resource_directory, path or '')
if os.path.isfile(source):
shutil.copy2(source, destination)
else:
dir_util.copy_tree(source, destination) # pylint: disable=no-member
def delete(self, entry_id, path=None, **_):
"""
Deletes a file system storage resource.
:param str entry_id: the id of the entry.
:param str path: a path to delete relative to the root of the entry (otherwise all).
"""
destination = os.path.join(self.directory, self.name, entry_id, path or '')
if os.path.exists(destination):
if os.path.isfile(destination):
os.remove(destination)
else:
shutil.rmtree(destination)
return True
return False