blob: d0703e60e84d304cefaf17e4195fb4fc5d0855ec [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
File system implementation of the storage resource API ("RAPI").
"""
import os
import shutil
from multiprocessing import RLock
from contextlib import contextmanager
from functools import partial
from distutils import dir_util # https://github.com/PyCQA/pylint/issues/73; pylint: disable=no-name-in-module
from aria.storage import (
api,
exceptions
)
class FileSystemResourceAPI(api.ResourceAPI):
"""
File system implementation of the storage resource API ("RAPI").
"""
def __init__(self, directory, **kwargs):
"""
:param directory: root dir for storage
"""
super(FileSystemResourceAPI, self).__init__(**kwargs)
self.directory = directory
self.base_path = os.path.join(self.directory, self.name)
self._join_path = partial(os.path.join, self.base_path)
self._lock = RLock()
@contextmanager
def connect(self):
"""
Establishes a connection and destroys it after use.
"""
try:
self._establish_connection()
yield self
except BaseException as e:
raise exceptions.StorageError(str(e))
finally:
self._destroy_connection()
def _establish_connection(self):
"""
Establishes a connection. Used in the ``connect`` context manager.
"""
self._lock.acquire()
def _destroy_connection(self):
"""
Destroys a connection. Used in the ``connect`` context manager.
"""
self._lock.release()
def __repr__(self):
return '{cls.__name__}(directory={self.directory})'.format(
cls=self.__class__, self=self)
def create(self, **kwargs):
"""
Creates a directory in by path. Tries to create the root directory as well.
:param name: path of directory
"""
try:
os.makedirs(self.directory)
except (OSError, IOError):
pass
try:
os.makedirs(self.base_path)
except (OSError, IOError):
pass
def read(self, entry_id, path, **_):
"""
Retrieves the contents of a file.
:param entry_id: entry ID
:param path: path to resource
:return: contents of the file
:rtype: bytes
"""
resource_relative_path = os.path.join(self.name, entry_id, path or '')
resource = os.path.join(self.directory, resource_relative_path)
if not os.path.exists(resource):
raise exceptions.StorageError("Resource {0} does not exist".
format(resource_relative_path))
if not os.path.isfile(resource):
resources = os.listdir(resource)
if len(resources) != 1:
raise exceptions.StorageError(
'Failed to read {0}; Reading a directory is '
'only allowed when it contains a single resource'.format(resource))
resource = os.path.join(resource, resources[0])
with open(resource, 'rb') as resource_file:
return resource_file.read()
def download(self, entry_id, destination, path=None, **_):
"""
Downloads a file or directory.
:param entry_id: entry ID
:param destination: download destination
:param path: path to download relative to the root of the entry (otherwise all)
"""
resource_relative_path = os.path.join(self.name, entry_id, path or '')
resource = os.path.join(self.directory, resource_relative_path)
if not os.path.exists(resource):
raise exceptions.StorageError("Resource {0} does not exist".
format(resource_relative_path))
if os.path.isfile(resource):
shutil.copy2(resource, destination)
else:
dir_util.copy_tree(resource, destination) # pylint: disable=no-member
def upload(self, entry_id, source, path=None, **_):
"""
Uploads a file or directory.
:param entry_id: entry ID
:param source: source of the files to upload
:param path: the destination of the file/s relative to the entry root dir.
"""
resource_directory = os.path.join(self.directory, self.name, entry_id)
if not os.path.exists(resource_directory):
os.makedirs(resource_directory)
destination = os.path.join(resource_directory, path or '')
if os.path.isfile(source):
shutil.copy2(source, destination)
else:
dir_util.copy_tree(source, destination) # pylint: disable=no-member
def delete(self, entry_id, path=None, **_):
"""
Deletes a file or directory.
:param entry_id: entry ID
:param path: path to delete relative to the root of the entry (otherwise all)
"""
destination = os.path.join(self.directory, self.name, entry_id, path or '')
if os.path.exists(destination):
if os.path.isfile(destination):
os.remove(destination)
else:
shutil.rmtree(destination)
return True
return False