| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os.path |
| import random |
| import hashlib |
| |
| from libcloud.utils.py3 import PY3 |
| from libcloud.utils.py3 import b |
| |
| if PY3: |
| from io import FileIO as file |
| |
| from libcloud.common.types import LibcloudError |
| |
| from libcloud.storage.base import Object, Container, StorageDriver |
| from libcloud.storage.types import ContainerAlreadyExistsError |
| from libcloud.storage.types import ContainerDoesNotExistError |
| from libcloud.storage.types import ContainerIsNotEmptyError |
| from libcloud.storage.types import ObjectDoesNotExistError |
| |
| |
| class DummyFileObject(file): |
| def __init__(self, yield_count=5, chunk_len=10): |
| self._yield_count = yield_count |
| self._chunk_len = chunk_len |
| |
| def read(self, size): |
| i = 0 |
| |
| while i < self._yield_count: |
| yield self._get_chunk(self._chunk_len) |
| i += 1 |
| |
| def _get_chunk(self, chunk_len): |
| chunk = [str(x) for x in random.randint(97, 120)] |
| return chunk |
| |
| def __len__(self): |
| return self._yield_count * self._chunk_len |
| |
| |
| class DummyIterator(object): |
| def __init__(self, data=None): |
| self.hash = hashlib.md5() |
| self._data = data or [] |
| self._current_item = 0 |
| |
| def get_md5_hash(self): |
| return self.hash.hexdigest() |
| |
| def next(self): |
| if self._current_item == len(self._data): |
| raise StopIteration |
| |
| value = self._data[self._current_item] |
| self.hash.update(b(value)) |
| self._current_item += 1 |
| return value |
| |
| def __next__(self): |
| return self.next() |
| |
| def __enter__(self): |
| pass |
| |
| def __exit__(self, type, value, traceback): |
| pass |
| |
| |
| class DummyStorageDriver(StorageDriver): |
| """ |
| Dummy Storage driver. |
| |
| >>> from libcloud.storage.drivers.dummy import DummyStorageDriver |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container = driver.create_container(container_name='test container') |
| >>> container |
| <Container: name=test container, provider=Dummy Storage Provider> |
| >>> container.name |
| 'test container' |
| >>> container.extra['object_count'] |
| 0 |
| """ |
| |
| name = "Dummy Storage Provider" |
| website = "http://example.com" |
| |
| def __init__(self, api_key, api_secret): |
| """ |
| :param api_key: API key or username to used (required) |
| :type api_key: ``str`` |
| :param api_secret: Secret password to be used (required) |
| :type api_secret: ``str`` |
| :rtype: ``None`` |
| """ |
| self._containers = {} |
| |
| def get_meta_data(self): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> driver.get_meta_data()['object_count'] |
| 0 |
| >>> driver.get_meta_data()['container_count'] |
| 0 |
| >>> driver.get_meta_data()['bytes_used'] |
| 0 |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container_name = 'test container 2' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> obj = container.upload_object_via_stream( |
| ... object_name='test object', iterator=DummyFileObject(5, 10), |
| ... extra={}) |
| >>> driver.get_meta_data()['object_count'] |
| 1 |
| >>> driver.get_meta_data()['container_count'] |
| 2 |
| >>> driver.get_meta_data()['bytes_used'] |
| 50 |
| |
| :rtype: ``dict`` |
| """ |
| |
| container_count = len(self._containers) |
| object_count = sum( |
| [ |
| len(self._containers[container]["objects"]) |
| for container in self._containers |
| ] |
| ) |
| |
| bytes_used = 0 |
| for container in self._containers: |
| objects = self._containers[container]["objects"] |
| for _, obj in objects.items(): |
| bytes_used += obj.size |
| |
| return { |
| "container_count": int(container_count), |
| "object_count": int(object_count), |
| "bytes_used": int(bytes_used), |
| } |
| |
| def iterate_containers(self): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> list(driver.iterate_containers()) |
| [] |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> container.name |
| 'test container 1' |
| >>> container_name = 'test container 2' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 2, provider=Dummy Storage Provider> |
| >>> container = driver.create_container( |
| ... container_name='test container 2') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerAlreadyExistsError: |
| >>> container_list=list(driver.iterate_containers()) |
| >>> sorted([c.name for c in container_list]) |
| ['test container 1', 'test container 2'] |
| |
| @inherits: :class:`StorageDriver.iterate_containers` |
| """ |
| |
| for container in list(self._containers.values()): |
| yield container["container"] |
| |
| def iterate_container_objects(self, container, prefix=None, ex_prefix=None): |
| prefix = self._normalize_prefix_argument(prefix, ex_prefix) |
| |
| container = self.get_container(container.name) |
| objects = self._containers[container.name]["objects"].values() |
| return self._filter_listed_container_objects(objects, prefix) |
| |
| def get_container(self, container_name): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerDoesNotExistError: |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> container.name |
| 'test container 1' |
| >>> driver.get_container('test container 1') |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| |
| @inherits: :class:`StorageDriver.get_container` |
| """ |
| |
| if container_name not in self._containers: |
| raise ContainerDoesNotExistError( |
| driver=self, value=None, container_name=container_name |
| ) |
| |
| return self._containers[container_name]["container"] |
| |
| def get_container_cdn_url(self, container): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> driver.get_container('unknown') #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerDoesNotExistError: |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> container.name |
| 'test container 1' |
| >>> container.get_cdn_url() |
| 'http://www.test.com/container/test_container_1' |
| |
| @inherits: :class:`StorageDriver.get_container_cdn_url` |
| """ |
| |
| if container.name not in self._containers: |
| raise ContainerDoesNotExistError( |
| driver=self, value=None, container_name=container.name |
| ) |
| |
| return self._containers[container.name]["cdn_url"] |
| |
| def get_object(self, container_name, object_name): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> driver.get_object('unknown', 'unknown') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerDoesNotExistError: |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> driver.get_object( |
| ... 'test container 1', 'unknown') #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ObjectDoesNotExistError: |
| >>> obj = container.upload_object_via_stream(object_name='test object', |
| ... iterator=DummyFileObject(5, 10), extra={}) |
| >>> obj.name |
| 'test object' |
| >>> obj.size |
| 50 |
| |
| @inherits: :class:`StorageDriver.get_object` |
| """ |
| |
| self.get_container(container_name) |
| container_objects = self._containers[container_name]["objects"] |
| if object_name not in container_objects: |
| raise ObjectDoesNotExistError( |
| object_name=object_name, value=None, driver=self |
| ) |
| |
| return container_objects[object_name] |
| |
| def get_object_cdn_url(self, obj): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> obj = container.upload_object_via_stream( |
| ... object_name='test object 5', |
| ... iterator=DummyFileObject(5, 10), extra={}) |
| >>> obj.name |
| 'test object 5' |
| >>> obj.get_cdn_url() |
| 'http://www.test.com/object/test_object_5' |
| |
| @inherits: :class:`StorageDriver.get_object_cdn_url` |
| """ |
| |
| container_name = obj.container.name |
| container_objects = self._containers[container_name]["objects"] |
| if obj.name not in container_objects: |
| raise ObjectDoesNotExistError(object_name=obj.name, value=None, driver=self) |
| |
| return container_objects[obj.name].meta_data["cdn_url"] |
| |
| def create_container(self, container_name): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container |
| <Container: name=test container 1, provider=Dummy Storage Provider> |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerAlreadyExistsError: |
| |
| @inherits: :class:`StorageDriver.create_container` |
| """ |
| |
| if container_name in self._containers: |
| raise ContainerAlreadyExistsError( |
| container_name=container_name, value=None, driver=self |
| ) |
| |
| extra = {"object_count": 0} |
| container = Container(name=container_name, extra=extra, driver=self) |
| |
| self._containers[container_name] = { |
| "container": container, |
| "objects": {}, |
| "cdn_url": "http://www.test.com/container/%s" |
| % (container_name.replace(" ", "_")), |
| } |
| return container |
| |
| def delete_container(self, container): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container = Container(name = 'test container', |
| ... extra={'object_count': 0}, driver=driver) |
| >>> driver.delete_container(container=container) |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerDoesNotExistError: |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| >>> len(driver._containers) |
| 1 |
| >>> driver.delete_container(container=container) |
| True |
| >>> len(driver._containers) |
| 0 |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| >>> obj = container.upload_object_via_stream( |
| ... object_name='test object', iterator=DummyFileObject(5, 10), |
| ... extra={}) |
| >>> driver.delete_container(container=container) |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ContainerIsNotEmptyError: |
| |
| @inherits: :class:`StorageDriver.delete_container` |
| """ |
| |
| container_name = container.name |
| if container_name not in self._containers: |
| raise ContainerDoesNotExistError( |
| container_name=container_name, value=None, driver=self |
| ) |
| |
| container = self._containers[container_name] |
| if len(container["objects"]) > 0: |
| raise ContainerIsNotEmptyError( |
| container_name=container_name, value=None, driver=self |
| ) |
| |
| del self._containers[container_name] |
| return True |
| |
| def download_object( |
| self, obj, destination_path, overwrite_existing=False, delete_on_failure=True |
| ): |
| kwargs_dict = { |
| "obj": obj, |
| "response": DummyFileObject(), |
| "destination_path": destination_path, |
| "overwrite_existing": overwrite_existing, |
| "delete_on_failure": delete_on_failure, |
| } |
| |
| return self._save_object(**kwargs_dict) |
| |
| def download_object_as_stream(self, obj, chunk_size=None): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| >>> obj = container.upload_object_via_stream(object_name='test object', |
| ... iterator=DummyFileObject(5, 10), extra={}) |
| >>> stream = container.download_object_as_stream(obj) |
| >>> stream #doctest: +ELLIPSIS |
| <...closed...> |
| |
| @inherits: :class:`StorageDriver.download_object_as_stream` |
| """ |
| |
| return DummyFileObject() |
| |
| def upload_object( |
| self, |
| file_path, |
| container, |
| object_name, |
| extra=None, |
| verify_hash=True, |
| headers=None, |
| ): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container_name = 'test container 1' |
| >>> container = driver.create_container(container_name=container_name) |
| >>> container.upload_object(file_path='/tmp/inexistent.file', |
| ... object_name='test') #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| LibcloudError: |
| >>> file_path = path = os.path.abspath(__file__) |
| >>> file_size = os.path.getsize(file_path) |
| >>> obj = container.upload_object(file_path=file_path, |
| ... object_name='test') |
| >>> obj #doctest: +ELLIPSIS |
| <Object: name=test, size=...> |
| >>> obj.size == file_size |
| True |
| |
| @inherits: :class:`StorageDriver.upload_object` |
| """ |
| |
| if not os.path.exists(file_path): |
| raise LibcloudError( |
| value="File %s does not exist" % (file_path), driver=self |
| ) |
| |
| size = os.path.getsize(file_path) |
| return self._add_object( |
| container=container, object_name=object_name, size=size, extra=extra |
| ) |
| |
| def upload_object_via_stream( |
| self, iterator, container, object_name, extra=None, headers=None |
| ): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| >>> obj = container.upload_object_via_stream( |
| ... object_name='test object', iterator=DummyFileObject(5, 10), |
| ... extra={}) |
| >>> obj #doctest: +ELLIPSIS |
| <Object: name=test object, size=50, ...> |
| |
| @inherits: :class:`StorageDriver.upload_object_via_stream` |
| """ |
| |
| size = len(iterator) |
| return self._add_object( |
| container=container, object_name=object_name, size=size, extra=extra |
| ) |
| |
| def delete_object(self, obj): |
| """ |
| >>> driver = DummyStorageDriver('key', 'secret') |
| >>> container = driver.create_container( |
| ... container_name='test container 1') |
| ... #doctest: +IGNORE_EXCEPTION_DETAIL |
| >>> obj = container.upload_object_via_stream(object_name='test object', |
| ... iterator=DummyFileObject(5, 10), extra={}) |
| >>> obj #doctest: +ELLIPSIS |
| <Object: name=test object, size=50, ...> |
| >>> container.delete_object(obj=obj) |
| True |
| >>> obj = Object(name='test object 2', |
| ... size=1000, hash=None, extra=None, |
| ... meta_data=None, container=container,driver=None) |
| >>> container.delete_object(obj=obj) #doctest: +IGNORE_EXCEPTION_DETAIL |
| Traceback (most recent call last): |
| ObjectDoesNotExistError: |
| |
| @inherits: :class:`StorageDriver.delete_object` |
| """ |
| |
| container_name = obj.container.name |
| object_name = obj.name |
| obj = self.get_object(container_name=container_name, object_name=object_name) |
| |
| del self._containers[container_name]["objects"][object_name] |
| return True |
| |
| def _add_object(self, container, object_name, size, extra=None): |
| container = self.get_container(container.name) |
| |
| extra = extra or {} |
| meta_data = extra.get("meta_data", {}) |
| meta_data.update( |
| { |
| "cdn_url": "http://www.test.com/object/%s" |
| % (object_name.replace(" ", "_")) |
| } |
| ) |
| obj = Object( |
| name=object_name, |
| size=size, |
| extra=extra, |
| hash=None, |
| meta_data=meta_data, |
| container=container, |
| driver=self, |
| ) |
| |
| self._containers[container.name]["objects"][object_name] = obj |
| return obj |
| |
| |
| if __name__ == "__main__": |
| import doctest |
| |
| doctest.testmod() |