| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import sys |
| import copy |
| import hmac |
| import math |
| import hashlib |
| import os.path # pylint: disable-msg=W0404 |
| from io import BytesIO |
| from hashlib import sha1 |
| from unittest import mock |
| from unittest.mock import Mock, PropertyMock |
| |
| import libcloud.utils.files |
| from libcloud.test import MockHttp # pylint: disable-msg=E0611 |
| from libcloud.test import unittest, make_response, generate_random_data |
| from libcloud.utils.py3 import StringIO, b, httplib, urlquote |
| from libcloud.utils.files import exhaust_iterator |
| from libcloud.common.types import MalformedResponseError |
| from libcloud.storage.base import CHUNK_SIZE, Object, Container |
| from libcloud.storage.types import ( |
| ObjectDoesNotExistError, |
| ObjectHashMismatchError, |
| ContainerIsNotEmptyError, |
| InvalidContainerNameError, |
| ContainerDoesNotExistError, |
| ContainerAlreadyExistsError, |
| ) |
| from libcloud.test.storage.base import BaseRangeDownloadMockHttp |
| from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611 |
| from libcloud.storage.drivers.cloudfiles import CloudFilesStorageDriver |
| |
| |
| class CloudFilesTests(unittest.TestCase): |
| driver_klass = CloudFilesStorageDriver |
| driver_args = ("dummy", "dummy") |
| driver_kwargs = {} |
| region = "ord" |
| |
| def setUp(self): |
| self.driver_klass.connectionCls.conn_class = CloudFilesMockHttp |
| CloudFilesMockHttp.type = None |
| |
| driver_kwargs = self.driver_kwargs.copy() |
| driver_kwargs["region"] = self.region |
| self.driver = self.driver_klass(*self.driver_args, **driver_kwargs) |
| |
| # normally authentication happens lazily, but we force it here |
| self.driver.connection._populate_hosts_and_request_paths() |
| self._remove_test_file() |
| |
| def tearDown(self): |
| self._remove_test_file() |
| |
| def test_invalid_ex_force_service_region(self): |
| driver = CloudFilesStorageDriver("driver", "dummy", ex_force_service_region="invalid") |
| |
| try: |
| driver.list_containers() |
| except Exception as e: |
| self.assertEqual(e.value, "Could not find specified endpoint") |
| else: |
| self.fail("Exception was not thrown") |
| |
| def test_ex_force_service_region(self): |
| driver = CloudFilesStorageDriver("driver", "dummy", ex_force_service_region="ORD") |
| driver.list_containers() |
| |
| def test_force_auth_token_kwargs(self): |
| base_url = "https://cdn2.clouddrive.com/v1/MossoCloudFS" |
| kwargs = { |
| "ex_force_auth_token": "some-auth-token", |
| "ex_force_base_url": base_url, |
| } |
| driver = CloudFilesStorageDriver("driver", "dummy", **kwargs) |
| driver.list_containers() |
| |
| self.assertEqual(kwargs["ex_force_auth_token"], driver.connection.auth_token) |
| self.assertEqual("cdn2.clouddrive.com", driver.connection.host) |
| self.assertEqual("/v1/MossoCloudFS", driver.connection.request_path) |
| |
| def test_force_auth_url_kwargs(self): |
| kwargs = { |
| "ex_force_auth_version": "2.0", |
| "ex_force_auth_url": "https://identity.api.rackspace.com", |
| } |
| driver = CloudFilesStorageDriver("driver", "dummy", **kwargs) |
| |
| self.assertEqual(kwargs["ex_force_auth_url"], driver.connection._ex_force_auth_url) |
| self.assertEqual(kwargs["ex_force_auth_version"], driver.connection._auth_version) |
| |
| def test_invalid_json_throws_exception(self): |
| CloudFilesMockHttp.type = "MALFORMED_JSON" |
| try: |
| self.driver.list_containers() |
| except MalformedResponseError: |
| pass |
| else: |
| self.fail("Exception was not thrown") |
| |
| def test_service_catalog(self): |
| url = "https://storage4.%s1.clouddrive.com/v1/MossoCloudFS" % (self.region) |
| self.assertEqual(url, self.driver.connection.get_endpoint()) |
| |
| self.driver.connection.cdn_request = True |
| self.assertEqual( |
| "https://cdn.clouddrive.com/v1/MossoCloudFS", |
| self.driver.connection.get_endpoint(), |
| ) |
| self.driver.connection.cdn_request = False |
| |
| def test_list_containers(self): |
| CloudFilesMockHttp.type = "EMPTY" |
| containers = self.driver.list_containers() |
| self.assertEqual(len(containers), 0) |
| |
| CloudFilesMockHttp.type = None |
| containers = self.driver.list_containers() |
| self.assertEqual(len(containers), 3) |
| |
| container = [c for c in containers if c.name == "container2"][0] |
| self.assertEqual(container.extra["object_count"], 120) |
| self.assertEqual(container.extra["size"], 340084450) |
| |
| def test_list_container_objects(self): |
| CloudFilesMockHttp.type = "EMPTY" |
| container = Container(name="test_container", extra={}, driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 0) |
| |
| CloudFilesMockHttp.type = None |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 4) |
| |
| obj = [o for o in objects if o.name == "foo test 1"][0] |
| self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc") |
| self.assertEqual(obj.size, 1160520) |
| self.assertEqual(obj.container.name, "test_container") |
| |
| def test_list_container_object_name_encoding(self): |
| CloudFilesMockHttp.type = "EMPTY" |
| container = Container(name="test container 1", extra={}, driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 0) |
| |
| def test_list_container_objects_with_prefix(self): |
| CloudFilesMockHttp.type = "EMPTY" |
| container = Container(name="test_container", extra={}, driver=self.driver) |
| objects = self.driver.list_container_objects(container=container, prefix="test_prefix1") |
| self.assertEqual(len(objects), 0) |
| |
| CloudFilesMockHttp.type = None |
| objects = self.driver.list_container_objects(container=container, prefix="test_prefix2") |
| self.assertEqual(len(objects), 4) |
| |
| obj = [o for o in objects if o.name == "foo test 1"][0] |
| self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc") |
| self.assertEqual(obj.size, 1160520) |
| self.assertEqual(obj.container.name, "test_container") |
| |
| def test_list_container_objects_iterator(self): |
| CloudFilesMockHttp.type = "ITERATOR" |
| container = Container(name="test_container", extra={}, driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 5) |
| |
| obj = [o for o in objects if o.name == "foo-test-1"][0] |
| self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc") |
| self.assertEqual(obj.size, 1160520) |
| self.assertEqual(obj.container.name, "test_container") |
| |
| def test_get_container(self): |
| container = self.driver.get_container(container_name="test_container") |
| self.assertEqual(container.name, "test_container") |
| self.assertEqual(container.extra["object_count"], 800) |
| self.assertEqual(container.extra["size"], 1234568) |
| |
| def test_get_container_not_found(self): |
| try: |
| self.driver.get_container(container_name="not_found") |
| except ContainerDoesNotExistError: |
| pass |
| else: |
| self.fail("Exception was not thrown") |
| |
| def test_get_object_success(self): |
| obj = self.driver.get_object(container_name="test_container", object_name="test_object") |
| self.assertEqual(obj.container.name, "test_container") |
| self.assertEqual(obj.size, 555) |
| self.assertEqual(obj.hash, "6b21c4a111ac178feacf9ec9d0c71f17") |
| self.assertEqual(obj.extra["content_type"], "application/zip") |
| self.assertEqual(obj.extra["last_modified"], "Tue, 25 Jan 2011 22:01:49 GMT") |
| self.assertEqual(obj.meta_data["foo-bar"], "test 1") |
| self.assertEqual(obj.meta_data["bar-foo"], "test 2") |
| |
| def test_get_object_object_name_encoding(self): |
| obj = self.driver.get_object(container_name="test_container", object_name="~/test_object/") |
| self.assertEqual(obj.name, "~/test_object/") |
| |
| def test_get_object_not_found(self): |
| try: |
| self.driver.get_object(container_name="test_container", object_name="not_found") |
| except ObjectDoesNotExistError: |
| pass |
| else: |
| self.fail("Exception was not thrown") |
| |
| def test_create_container_success(self): |
| container = self.driver.create_container(container_name="test_create_container") |
| self.assertTrue(isinstance(container, Container)) |
| self.assertEqual(container.name, "test_create_container") |
| self.assertEqual(container.extra["object_count"], 0) |
| |
| def test_create_container_already_exists(self): |
| CloudFilesMockHttp.type = "ALREADY_EXISTS" |
| |
| try: |
| self.driver.create_container(container_name="test_create_container") |
| except ContainerAlreadyExistsError: |
| pass |
| else: |
| self.fail("Container already exists but an exception was not thrown") |
| |
| def test_create_container_invalid_name_too_long(self): |
| name = "".join(["x" for x in range(0, 257)]) |
| try: |
| self.driver.create_container(container_name=name) |
| except InvalidContainerNameError: |
| pass |
| else: |
| self.fail( |
| "Invalid name was provided (name is too long)" ", but exception was not thrown" |
| ) |
| |
| def test_create_container_invalid_name_slashes_in_name(self): |
| try: |
| self.driver.create_container(container_name="test/slashes/") |
| except InvalidContainerNameError: |
| pass |
| else: |
| self.fail( |
| "Invalid name was provided (name contains slashes)" ", but exception was not thrown" |
| ) |
| |
| def test_delete_container_success(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| result = self.driver.delete_container(container=container) |
| self.assertTrue(result) |
| |
| def test_delete_container_not_found(self): |
| CloudFilesMockHttp.type = "NOT_FOUND" |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| try: |
| self.driver.delete_container(container=container) |
| except ContainerDoesNotExistError: |
| pass |
| else: |
| self.fail("Container does not exist but an exception was not thrown") |
| |
| def test_delete_container_not_empty(self): |
| CloudFilesMockHttp.type = "NOT_EMPTY" |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| try: |
| self.driver.delete_container(container=container) |
| except ContainerIsNotEmptyError: |
| pass |
| else: |
| self.fail("Container is not empty but an exception was not thrown") |
| |
| def test_download_object_success(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| destination_path = os.path.abspath(__file__) + ".temp" |
| result = self.driver.download_object( |
| obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True, |
| ) |
| self.assertTrue(result) |
| |
| def test_download_object_invalid_file_size(self): |
| CloudFilesMockHttp.type = "INVALID_SIZE" |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| destination_path = os.path.abspath(__file__) + ".temp" |
| result = self.driver.download_object( |
| obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True, |
| ) |
| self.assertFalse(result) |
| |
| def test_download_object_success_not_found(self): |
| CloudFilesMockHttp.type = "NOT_FOUND" |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| destination_path = os.path.abspath(__file__) + ".temp" |
| try: |
| self.driver.download_object( |
| obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True, |
| ) |
| except ObjectDoesNotExistError: |
| pass |
| else: |
| self.fail("Object does not exist but an exception was not thrown") |
| |
| def test_download_object_range_success(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object_range", |
| size=10, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| destination_path = os.path.abspath(__file__) + ".temp" |
| result = self.driver.download_object_range( |
| obj=obj, |
| destination_path=destination_path, |
| start_bytes=5, |
| end_bytes=7, |
| overwrite_existing=False, |
| delete_on_failure=True, |
| ) |
| self.assertTrue(result) |
| |
| with open(destination_path) as fp: |
| content = fp.read() |
| |
| self.assertEqual(content, "56") |
| |
| def test_download_object_as_stream(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| |
| stream = self.driver.download_object_as_stream(obj=obj, chunk_size=None) |
| self.assertTrue(hasattr(stream, "__iter__")) |
| |
| def test_download_object_as_stream_data_is_not_buffered_in_memory(self): |
| # Test case which verifies that response.response attribute is not accessed |
| # and as such, whole body response is not buffered into RAM |
| |
| # If content is consumed and response.content attribute accessed exception |
| # will be thrown and test will fail |
| mock_response = Mock(name="mock response") |
| mock_response.headers = {} |
| mock_response.status = 200 |
| msg1 = '"response" attribute was accessed but it shouldn\'t have been' |
| msg2 = '"content" attribute was accessed but it shouldn\'t have been' |
| type(mock_response).response = PropertyMock( |
| name="mock response attribute", side_effect=Exception(msg1) |
| ) |
| type(mock_response).content = PropertyMock( |
| name="mock content attribute", side_effect=Exception(msg2) |
| ) |
| mock_response.iter_content.return_value = StringIO("a" * 1000) |
| |
| self.driver.connection.request = Mock() |
| self.driver.connection.request.return_value = mock_response |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self.driver) |
| obj = Object( |
| name="foo_bar_object_NO_BUFFER", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self.driver, |
| ) |
| result = self.driver.download_object_as_stream(obj=obj) |
| result = exhaust_iterator(result) |
| result = result.decode("utf-8") |
| |
| self.assertEqual(result, "a" * 1000) |
| |
| def test_download_object_range_as_stream_success(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object_range", |
| size=2, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| |
| stream = self.driver.download_object_range_as_stream( |
| start_bytes=5, end_bytes=7, obj=obj, chunk_size=None |
| ) |
| self.assertTrue(hasattr(stream, "__iter__")) |
| consumed_stream = "".join(chunk.decode("utf-8") for chunk in stream) |
| self.assertEqual(consumed_stream, "56") |
| self.assertEqual(len(consumed_stream), obj.size) |
| |
| def test_upload_object_success(self): |
| def upload_file( |
| self, |
| object_name=None, |
| content_type=None, |
| request_path=None, |
| request_method=None, |
| headers=None, |
| file_path=None, |
| stream=None, |
| ): |
| return { |
| "response": make_response( |
| 201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"} |
| ), |
| "bytes_transferred": 1000, |
| "data_hash": "0cc175b9c0f1b6a831c399e269772661", |
| } |
| |
| old_func = CloudFilesStorageDriver._upload_object |
| CloudFilesStorageDriver._upload_object = upload_file |
| file_path = os.path.abspath(__file__) |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| extra = {"meta_data": {"some-value": "foobar"}} |
| obj = self.driver.upload_object( |
| file_path=file_path, |
| container=container, |
| extra=extra, |
| object_name=object_name, |
| ) |
| self.assertEqual(obj.name, "foo_test_upload") |
| self.assertEqual(obj.size, 1000) |
| self.assertTrue("some-value" in obj.meta_data) |
| CloudFilesStorageDriver._upload_object = old_func |
| |
| def test_upload_object_zero_size_object(self): |
| def upload_file( |
| self, |
| object_name=None, |
| content_type=None, |
| request_path=None, |
| request_method=None, |
| headers=None, |
| file_path=None, |
| stream=None, |
| ): |
| return { |
| "response": make_response( |
| 201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"} |
| ), |
| "bytes_transferred": 0, |
| "data_hash": "0cc175b9c0f1b6a831c399e269772661", |
| } |
| |
| old_func = CloudFilesStorageDriver._upload_object |
| CloudFilesStorageDriver._upload_object = upload_file |
| |
| old_request = self.driver.connection.request |
| |
| file_path = os.path.join(os.path.dirname(__file__), "__init__.py") |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "empty" |
| extra = {} |
| |
| def func(*args, **kwargs): |
| self.assertEqual(kwargs["headers"]["Content-Length"], 0) |
| func.called = True |
| return old_request(*args, **kwargs) |
| |
| self.driver.connection.request = func |
| func.called = False |
| obj = self.driver.upload_object( |
| file_path=file_path, |
| container=container, |
| extra=extra, |
| object_name=object_name, |
| ) |
| self.assertEqual(obj.name, "empty") |
| self.assertEqual(obj.size, 0) |
| CloudFilesStorageDriver._upload_object = old_func |
| self.driver.connection.request = old_request |
| |
| def test_upload_object_invalid_hash(self): |
| CloudFilesMockHttp.type = "INVALID_HASH" |
| |
| def upload_file( |
| self, |
| object_name=None, |
| content_type=None, |
| request_path=None, |
| request_method=None, |
| headers=None, |
| file_path=None, |
| stream=None, |
| ): |
| return { |
| "response": make_response( |
| 201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"} |
| ), |
| "bytes_transferred": 1000, |
| "data_hash": "blah blah", |
| } |
| |
| old_func = CloudFilesStorageDriver._upload_object |
| CloudFilesStorageDriver._upload_object = upload_file |
| file_path = os.path.abspath(__file__) |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| try: |
| self.driver.upload_object( |
| file_path=file_path, |
| container=container, |
| object_name=object_name, |
| verify_hash=True, |
| ) |
| except ObjectHashMismatchError: |
| pass |
| else: |
| self.fail("Invalid hash was returned but an exception was not thrown") |
| finally: |
| CloudFilesStorageDriver._upload_object = old_func |
| |
| def test_upload_object_no_content_type(self): |
| def no_content_type(name): |
| return None, None |
| |
| old_func = libcloud.utils.files.guess_file_mime_type |
| libcloud.utils.files.guess_file_mime_type = no_content_type |
| file_path = os.path.abspath(__file__) |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| |
| obj = self.driver.upload_object( |
| file_path=file_path, |
| verify_hash=False, |
| container=container, |
| object_name=object_name, |
| ) |
| |
| self.assertEqual(obj.name, object_name) |
| libcloud.utils.files.guess_file_mime_type = old_func |
| |
| def test_upload_object_inexistent_file(self): |
| def dummy_content_type(name): |
| return "application/zip", None |
| |
| old_func = libcloud.utils.files.guess_file_mime_type |
| libcloud.utils.files.guess_file_mime_type = dummy_content_type |
| |
| file_path = os.path.abspath(__file__ + ".inexistent") |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| try: |
| self.driver.upload_object( |
| file_path=file_path, container=container, object_name=object_name |
| ) |
| except OSError: |
| pass |
| else: |
| self.fail("Inexistent but an exception was not thrown") |
| finally: |
| libcloud.utils.files.guess_file_mime_type = old_func |
| |
| def test_upload_object_via_stream(self): |
| def dummy_content_type(name): |
| return "application/zip", None |
| |
| old_func = libcloud.utils.files.guess_file_mime_type |
| libcloud.utils.files.guess_file_mime_type = dummy_content_type |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_stream_data" |
| iterator = BytesIO(b("235")) |
| try: |
| self.driver.upload_object_via_stream( |
| container=container, object_name=object_name, iterator=iterator |
| ) |
| finally: |
| libcloud.utils.files.guess_file_mime_type = old_func |
| |
| def test_upload_object_via_stream_stream_seek_at_end(self): |
| def dummy_content_type(name): |
| return "application/zip", None |
| |
| old_func = libcloud.utils.files.guess_file_mime_type |
| libcloud.utils.files.guess_file_mime_type = dummy_content_type |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_stream_data_seek" |
| iterator = BytesIO(b("123456789")) |
| iterator.seek(10) |
| |
| self.assertEqual(iterator.tell(), 10) |
| |
| try: |
| self.driver.upload_object_via_stream( |
| container=container, object_name=object_name, iterator=iterator |
| ) |
| finally: |
| libcloud.utils.files.guess_file_mime_type = old_func |
| |
| def test_delete_object_success(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| status = self.driver.delete_object(obj=obj) |
| self.assertTrue(status) |
| |
| def test_delete_object_not_found(self): |
| CloudFilesMockHttp.type = "NOT_FOUND" |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=CloudFilesStorageDriver, |
| ) |
| try: |
| self.driver.delete_object(obj=obj) |
| except ObjectDoesNotExistError: |
| pass |
| else: |
| self.fail("Object does not exist but an exception was not thrown") |
| |
| def test_ex_get_meta_data(self): |
| meta_data = self.driver.ex_get_meta_data() |
| self.assertTrue(isinstance(meta_data, dict)) |
| self.assertTrue("object_count" in meta_data) |
| self.assertTrue("container_count" in meta_data) |
| self.assertTrue("bytes_used" in meta_data) |
| self.assertTrue("temp_url_key" in meta_data) |
| |
| def test_ex_purge_object_from_cdn(self): |
| CloudFilesMockHttp.type = "PURGE_SUCCESS" |
| container = Container(name="foo_bar_container", extra={}, driver=self.driver) |
| obj = Object( |
| name="object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self, |
| ) |
| |
| self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj)) |
| |
| def test_ex_purge_object_from_cdn_with_email(self): |
| CloudFilesMockHttp.type = "PURGE_SUCCESS_EMAIL" |
| container = Container(name="foo_bar_container", extra={}, driver=self.driver) |
| obj = Object( |
| name="object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self, |
| ) |
| self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj, email="test@test.com")) |
| |
| @mock.patch("os.path.getsize") |
| def test_ex_multipart_upload_object_for_small_files(self, getsize_mock): |
| getsize_mock.return_value = 0 |
| |
| old_func = CloudFilesStorageDriver.upload_object |
| mocked_upload_object = mock.Mock(return_value="test") |
| CloudFilesStorageDriver.upload_object = mocked_upload_object |
| |
| file_path = os.path.abspath(__file__) |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| obj = self.driver.ex_multipart_upload_object( |
| file_path=file_path, container=container, object_name=object_name |
| ) |
| CloudFilesStorageDriver.upload_object = old_func |
| |
| self.assertTrue(mocked_upload_object.called) |
| self.assertEqual(obj, "test") |
| |
| def test_ex_multipart_upload_object_success(self): |
| _upload_object_part = CloudFilesStorageDriver._upload_object_part |
| _upload_object_manifest = CloudFilesStorageDriver._upload_object_manifest |
| |
| mocked__upload_object_part = mock.Mock(return_value="test_part") |
| mocked__upload_object_manifest = mock.Mock(return_value="test_manifest") |
| |
| CloudFilesStorageDriver._upload_object_part = mocked__upload_object_part |
| CloudFilesStorageDriver._upload_object_manifest = mocked__upload_object_manifest |
| |
| parts = 5 |
| file_path = os.path.abspath(__file__) |
| chunk_size = int(math.ceil(float(os.path.getsize(file_path)) / parts)) |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "foo_test_upload" |
| self.driver.ex_multipart_upload_object( |
| file_path=file_path, |
| container=container, |
| object_name=object_name, |
| chunk_size=chunk_size, |
| ) |
| |
| CloudFilesStorageDriver._upload_object_part = _upload_object_part |
| CloudFilesStorageDriver._upload_object_manifest = _upload_object_manifest |
| |
| self.assertEqual(mocked__upload_object_part.call_count, parts) |
| self.assertTrue(mocked__upload_object_manifest.call_count, 1) |
| |
| def test__upload_object_part(self): |
| _put_object = CloudFilesStorageDriver._put_object |
| mocked__put_object = mock.Mock(return_value="test") |
| CloudFilesStorageDriver._put_object = mocked__put_object |
| |
| part_number = 7 |
| object_name = "test_object" |
| expected_name = object_name + "/%08d" % part_number |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| |
| self.driver._upload_object_part(container, object_name, part_number, None) |
| |
| CloudFilesStorageDriver._put_object = _put_object |
| |
| func_kwargs = tuple(mocked__put_object.call_args)[1] |
| self.assertEqual(func_kwargs["object_name"], expected_name) |
| self.assertEqual(func_kwargs["container"], container) |
| |
| def test_upload_object_via_stream_with_cors_headers(self): |
| """ |
| Test we can add some ``Cross-origin resource sharing`` headers |
| to the request about to be sent. |
| """ |
| cors_headers = { |
| "Access-Control-Allow-Origin": "http://mozilla.com", |
| "Origin": "http://storage.clouddrive.com", |
| } |
| expected_headers = { |
| # Automatically added headers |
| "Content-Type": "application/octet-stream" |
| } |
| expected_headers.update(cors_headers) |
| |
| def intercept_request(request_path, method=None, data=None, headers=None, raw=True): |
| |
| # What we're actually testing |
| self.assertDictEqual(expected_headers, headers) |
| |
| raise NotImplementedError("oops") |
| |
| self.driver.connection.request = intercept_request |
| |
| container = Container(name="CORS", extra={}, driver=self.driver) |
| |
| try: |
| self.driver.upload_object_via_stream( |
| iterator=iter(b"blob data like an image or video"), |
| container=container, |
| object_name="test_object", |
| headers=cors_headers, |
| ) |
| except NotImplementedError: |
| # Don't care about the response we'd have to mock anyway |
| # as long as we intercepted the request and checked its headers |
| pass |
| else: |
| self.fail( |
| "Expected NotImplementedError to be thrown to " |
| "verify we actually checked the expected headers" |
| ) |
| |
| def test_upload_object_via_stream_python3_bytes_error(self): |
| container = Container(name="py3", extra={}, driver=self.driver) |
| bytes_blob = b"blob data like an image or video" |
| |
| # This is mostly to check we didn't discover other errors along the way |
| mocked_response = container.upload_object_via_stream( |
| iterator=iter(bytes_blob), |
| object_name="img_or_vid", |
| ) |
| self.assertEqual(len(bytes_blob), mocked_response.size) |
| |
| @unittest.skip("Skipping as chunking is disabled in 2.0rc1") |
| def test_upload_object_via_stream_chunked_encoding(self): |
| |
| # Create enough bytes it should get split into two chunks |
| bytes_blob = "".join(["\0" for _ in range(CHUNK_SIZE + 1)]) |
| hex_chunk_size = ("%X" % CHUNK_SIZE).encode("utf8") |
| expected = [ |
| # Chunk 1 |
| hex_chunk_size + b"\r\n", |
| bytes(bytes_blob[:CHUNK_SIZE].encode("utf8")), |
| b"\r\n", |
| # Chunk 2 |
| b"1\r\n", |
| bytes(bytes_blob[CHUNK_SIZE:].encode("utf8")), |
| b"\r\n", |
| # If chunked, also send a final message |
| b"0\r\n\r\n", |
| ] |
| logged_data = [] |
| |
| class InterceptResponse(MockHttp): |
| def __init__(self, connection, response=None): |
| super().__init__(connection=connection, response=response) |
| old_send = self.connection.connection.send |
| |
| def intercept_send(data): |
| old_send(data) |
| logged_data.append(data) |
| |
| self.connection.connection.send = intercept_send |
| |
| def _v1_MossoCloudFS_py3_img_or_vid2(self, method, url, body, headers): |
| headers = {"etag": "d79fb00c27b50494a463e680d459c90c"} |
| headers.update(self.base_headers) |
| _201 = httplib.CREATED |
| return _201, "", headers, httplib.responses[_201] |
| |
| self.driver_klass.connectionCls.rawResponseCls = InterceptResponse |
| |
| container = Container(name="py3", extra={}, driver=self.driver) |
| container.upload_object_via_stream( |
| iterator=iter(bytes_blob), |
| object_name="img_or_vid2", |
| ) |
| self.assertListEqual(expected, logged_data) |
| |
| def test__upload_object_manifest(self): |
| hash_function = self.driver._get_hash_function() |
| hash_function.update(b("")) |
| data_hash = hash_function.hexdigest() |
| |
| fake_response = type("CloudFilesResponse", (), {"headers": {"etag": data_hash}}) |
| |
| _request = self.driver.connection.request |
| mocked_request = mock.Mock(return_value=fake_response) |
| self.driver.connection.request = mocked_request |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "test_object" |
| |
| self.driver._upload_object_manifest(container, object_name) |
| |
| func_args, func_kwargs = tuple(mocked_request.call_args) |
| |
| self.driver.connection.request = _request |
| |
| self.assertEqual(func_args[0], "/" + container.name + "/" + object_name) |
| self.assertEqual( |
| func_kwargs["headers"]["X-Object-Manifest"], |
| container.name + "/" + object_name + "/", |
| ) |
| self.assertEqual(func_kwargs["method"], "PUT") |
| |
| def test__upload_object_manifest_wrong_hash(self): |
| fake_response = type("CloudFilesResponse", (), {"headers": {"etag": "0000000"}}) |
| |
| _request = self.driver.connection.request |
| mocked_request = mock.Mock(return_value=fake_response) |
| self.driver.connection.request = mocked_request |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| object_name = "test_object" |
| |
| try: |
| self.driver._upload_object_manifest(container, object_name) |
| except ObjectHashMismatchError: |
| pass |
| else: |
| self.fail("Exception was not thrown") |
| finally: |
| self.driver.connection.request = _request |
| |
| def test_create_container_put_object_name_encoding(self): |
| def upload_file( |
| self, |
| object_name=None, |
| content_type=None, |
| request_path=None, |
| request_method=None, |
| headers=None, |
| file_path=None, |
| stream=None, |
| ): |
| return { |
| "response": make_response( |
| 201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"} |
| ), |
| "bytes_transferred": 1000, |
| "data_hash": "0cc175b9c0f1b6a831c399e269772661", |
| } |
| |
| old_func = CloudFilesStorageDriver._upload_object |
| CloudFilesStorageDriver._upload_object = upload_file |
| |
| container_name = "speci@l_name" |
| object_name = "m@obj€ct" |
| file_path = os.path.abspath(__file__) |
| |
| container = self.driver.create_container(container_name=container_name) |
| self.assertEqual(container.name, container_name) |
| |
| obj = self.driver.upload_object( |
| file_path=file_path, container=container, object_name=object_name |
| ) |
| self.assertEqual(obj.name, object_name) |
| CloudFilesStorageDriver._upload_object = old_func |
| |
| def test_ex_enable_static_website(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| result = self.driver.ex_enable_static_website(container=container, index_file="index.html") |
| self.assertTrue(result) |
| |
| def test_ex_set_error_page(self): |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| result = self.driver.ex_set_error_page(container=container, file_name="error.html") |
| self.assertTrue(result) |
| |
| def test_ex_set_account_metadata_temp_url_key(self): |
| result = self.driver.ex_set_account_metadata_temp_url_key("a key") |
| self.assertTrue(result) |
| |
| @mock.patch("libcloud.storage.drivers.cloudfiles.time") |
| def test_ex_get_object_temp_url(self, time): |
| time.return_value = 0 |
| self.driver.ex_get_meta_data = mock.Mock() |
| self.driver.ex_get_meta_data.return_value = { |
| "container_count": 1, |
| "object_count": 1, |
| "bytes_used": 1, |
| "temp_url_key": "foo", |
| } |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self, |
| ) |
| hmac_body = "{}\n{}\n{}".format( |
| "GET", |
| 60, |
| "/v1/MossoCloudFS/foo_bar_container/foo_bar_object", |
| ) |
| sig = hmac.new(b("foo"), b(hmac_body), sha1).hexdigest() |
| ret = self.driver.ex_get_object_temp_url(obj, "GET") |
| temp_url = ( |
| "https://storage4.%s1.clouddrive.com/v1/MossoCloudFS/" |
| "foo_bar_container/foo_bar_object?temp_url_expires=60&temp_url_sig=%s" |
| % (self.region, sig) |
| ) |
| |
| self.assertEqual("".join(sorted(ret)), "".join(sorted(temp_url))) |
| |
| def test_ex_get_object_temp_url_no_key_raises_key_error(self): |
| self.driver.ex_get_meta_data = mock.Mock() |
| self.driver.ex_get_meta_data.return_value = { |
| "container_count": 1, |
| "object_count": 1, |
| "bytes_used": 1, |
| "temp_url_key": None, |
| } |
| container = Container(name="foo_bar_container", extra={}, driver=self) |
| obj = Object( |
| name="foo_bar_object", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self, |
| ) |
| self.assertRaises(KeyError, self.driver.ex_get_object_temp_url, obj, "GET") |
| |
| def _remove_test_file(self): |
| file_path = os.path.abspath(__file__) + ".temp" |
| |
| try: |
| os.unlink(file_path) |
| except OSError: |
| pass |
| |
| |
| class CloudFilesDeprecatedUSTests(CloudFilesTests): |
| driver_klass = CloudFilesStorageDriver |
| region = "ord" |
| |
| |
| class CloudFilesDeprecatedUKTests(CloudFilesTests): |
| driver_klass = CloudFilesStorageDriver |
| region = "lon" |
| |
| |
| class CloudFilesMockHttp(BaseRangeDownloadMockHttp, unittest.TestCase): |
| |
| fixtures = StorageFileFixtures("cloudfiles") |
| base_headers = {"content-type": "application/json; charset=UTF-8"} |
| |
| # fake auth token response |
| def _v2_0_tokens(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| body = self.fixtures.load("_v2_0__auth.json") |
| return (httplib.OK, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_MALFORMED_JSON(self, method, url, body, headers): |
| # test_invalid_json_throws_exception |
| body = 'broken: json /*"' |
| return ( |
| httplib.NO_CONTENT, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_MossoCloudFS_EMPTY(self, method, url, body, headers): |
| return ( |
| httplib.NO_CONTENT, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_MossoCloudFS(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| if method == "GET": |
| # list_containers |
| body = self.fixtures.load("list_containers.json") |
| status_code = httplib.OK |
| elif method == "HEAD": |
| # get_meta_data |
| body = self.fixtures.load("meta_data.json") |
| status_code = httplib.NO_CONTENT |
| headers.update( |
| { |
| "x-account-container-count": "10", |
| "x-account-object-count": "400", |
| "x-account-bytes-used": "1234567", |
| } |
| ) |
| elif method == "POST": |
| body = "" |
| status_code = httplib.NO_CONTENT |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_not_found(self, method, url, body, headers): |
| # test_get_object_not_found |
| if method == "HEAD": |
| body = "" |
| else: |
| raise ValueError("Invalid method") |
| |
| return ( |
| httplib.NOT_FOUND, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_MossoCloudFS_test_container_EMPTY(self, method, url, body, headers): |
| body = self.fixtures.load("list_container_objects_empty.json") |
| return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_20container_201_EMPTY(self, method, url, body, headers): |
| body = self.fixtures.load("list_container_objects_empty.json") |
| return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_container(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| if method == "GET": |
| # list_container_objects |
| if url.find("marker") == -1: |
| body = self.fixtures.load("list_container_objects.json") |
| status_code = httplib.OK |
| else: |
| body = "" |
| status_code = httplib.NO_CONTENT |
| elif method == "HEAD": |
| # get_container |
| body = self.fixtures.load("list_container_objects_empty.json") |
| status_code = httplib.NO_CONTENT |
| headers.update({"x-container-object-count": "800", "x-container-bytes-used": "1234568"}) |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_container_ITERATOR(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| # list_container_objects |
| if url.find("foo-test-3") != -1: |
| body = self.fixtures.load("list_container_objects_not_exhausted2.json") |
| status_code = httplib.OK |
| elif url.find("foo-test-5") != -1: |
| body = "" |
| status_code = httplib.NO_CONTENT |
| else: |
| # First request |
| body = self.fixtures.load("list_container_objects_not_exhausted1.json") |
| status_code = httplib.OK |
| |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_container_not_found(self, method, url, body, headers): |
| # test_get_container_not_found |
| if method == "HEAD": |
| body = "" |
| else: |
| raise ValueError("Invalid method") |
| |
| return ( |
| httplib.NOT_FOUND, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_MossoCloudFS_test_container_test_object(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| if method == "HEAD": |
| # get_object |
| body = self.fixtures.load("list_container_objects_empty.json") |
| status_code = httplib.NO_CONTENT |
| headers.update( |
| { |
| "content-length": "555", |
| "last-modified": "Tue, 25 Jan 2011 22:01:49 GMT", |
| "etag": "6b21c4a111ac178feacf9ec9d0c71f17", |
| "x-object-meta-foo-bar": "test 1", |
| "x-object-meta-bar-foo": "test 2", |
| "content-type": "application/zip", |
| } |
| ) |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_container__7E_test_object(self, method, url, body, headers): |
| headers = copy.deepcopy(self.base_headers) |
| if method == "HEAD": |
| # get_object_name_encoding |
| body = self.fixtures.load("list_container_objects_empty.json") |
| status_code = httplib.NO_CONTENT |
| headers.update( |
| { |
| "content-length": "555", |
| "last-modified": "Tue, 25 Jan 2011 22:01:49 GMT", |
| "etag": "6b21c4a111ac178feacf9ec9d0c71f17", |
| "x-object-meta-foo-bar": "test 1", |
| "x-object-meta-bar-foo": "test 2", |
| "content-type": "application/zip", |
| } |
| ) |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_create_container(self, method, url, body, headers): |
| # test_create_container_success |
| headers = copy.deepcopy(self.base_headers) |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = copy.deepcopy(self.base_headers) |
| headers.update({"content-length": "18", "date": "Mon, 28 Feb 2011 07:52:57 GMT"}) |
| status_code = httplib.CREATED |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_speci_40l_name(self, method, url, body, headers): |
| # test_create_container_put_object_name_encoding |
| # Verify that the name is properly url encoded |
| container_name = "speci@l_name" |
| encoded_container_name = urlquote(container_name) |
| self.assertTrue(encoded_container_name in url) |
| |
| headers = copy.deepcopy(self.base_headers) |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = copy.deepcopy(self.base_headers) |
| headers.update({"content-length": "18", "date": "Mon, 28 Feb 2011 07:52:57 GMT"}) |
| status_code = httplib.CREATED |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_test_create_container_ALREADY_EXISTS(self, method, url, body, headers): |
| # test_create_container_already_exists |
| headers = copy.deepcopy(self.base_headers) |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers.update({"content-type": "text/plain"}) |
| status_code = httplib.ACCEPTED |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container(self, method, url, body, headers): |
| if method == "DELETE": |
| # test_delete_container_success |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = self.base_headers |
| status_code = httplib.NO_CONTENT |
| elif method == "POST": |
| # test_ex_enable_static_website |
| body = "" |
| headers = self.base_headers |
| status_code = httplib.ACCEPTED |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS(self, method, url, body, headers): |
| |
| if method == "DELETE": |
| # test_ex_purge_from_cdn |
| headers = self.base_headers |
| status_code = httplib.NO_CONTENT |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS_EMAIL( |
| self, method, url, body, headers |
| ): |
| |
| if method == "DELETE": |
| # test_ex_purge_from_cdn_with_email |
| self.assertEqual(headers["X-Purge-Email"], "test@test.com") |
| headers = self.base_headers |
| status_code = httplib.NO_CONTENT |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_NOT_FOUND(self, method, url, body, headers): |
| |
| if method == "DELETE": |
| # test_delete_container_not_found |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = self.base_headers |
| status_code = httplib.NOT_FOUND |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_NOT_EMPTY(self, method, url, body, headers): |
| |
| if method == "DELETE": |
| # test_delete_container_not_empty |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = self.base_headers |
| status_code = httplib.CONFLICT |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_bar_object(self, method, url, body, headers): |
| |
| if method == "DELETE": |
| # test_delete_object_success |
| body = self.fixtures.load("list_container_objects_empty.json") |
| headers = self.base_headers |
| status_code = httplib.NO_CONTENT |
| return (status_code, body, headers, httplib.responses[httplib.OK]) |
| elif method == "GET": |
| body = generate_random_data(1000) |
| return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_range(self, method, url, body, headers): |
| if method == "GET": |
| # test_download_object_range_success |
| body = "0123456789123456789" |
| |
| self.assertTrue("Range" in headers) |
| self.assertEqual(headers["Range"], "bytes=5-6") |
| |
| start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str( |
| headers["Range"], body |
| ) |
| |
| return ( |
| httplib.PARTIAL_CONTENT, |
| body[start_bytes : end_bytes + 1], |
| self.base_headers, |
| httplib.responses[httplib.PARTIAL_CONTENT], |
| ) |
| |
| def _v1_MossoCloudFS_py3_img_or_vid(self, method, url, body, headers): |
| headers = {"etag": "e2378cace8712661ce7beec3d9362ef6"} |
| headers.update(self.base_headers) |
| return httplib.CREATED, "", headers, httplib.responses[httplib.CREATED] |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_test_upload(self, method, url, body, headers): |
| # test_object_upload_success |
| |
| body = "" |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = "hash343hhash89h932439jsaa89" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_speci_40l_name_m_40obj_E2_82_ACct(self, method, url, body, headers): |
| # test_create_container_put_object_name_encoding |
| # Verify that the name is properly url encoded |
| object_name = "m@obj€ct" |
| urlquote(object_name) |
| |
| headers = copy.deepcopy(self.base_headers) |
| body = "" |
| headers["etag"] = "hash343hhash89h932439jsaa89" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_empty(self, method, url, body, headers): |
| # test_upload_object_zero_size_object |
| body = "" |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = "hash343hhash89h932439jsaa89" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_test_upload_INVALID_HASH( |
| self, method, url, body, headers |
| ): |
| # test_object_upload_invalid_hash |
| body = "" |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = "foobar" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_INVALID_SIZE( |
| self, method, url, body, headers |
| ): |
| # test_download_object_invalid_file_size |
| body = generate_random_data(100) |
| return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NOT_FOUND( |
| self, method, url, body, headers |
| ): |
| body = "" |
| return ( |
| httplib.NOT_FOUND, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data(self, method, url, body, headers): |
| # test_upload_object_via_stream_success |
| hasher = hashlib.md5() |
| hasher.update(b"235") |
| hash_value = hasher.hexdigest() |
| |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = hash_value |
| body = "test" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data_seek( |
| self, method, url, body, headers |
| ): |
| |
| # test_upload_object_via_stream_stream_seek_at_end |
| hasher = hashlib.md5() |
| hasher.update(b"123456789") |
| hash_value = hasher.hexdigest() |
| |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = hash_value |
| body = "test" |
| return (httplib.CREATED, body, headers, httplib.responses[httplib.OK]) |
| |
| def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NO_BUFFER( |
| self, method, url, body, headers |
| ): |
| # test_download_object_data_is_not_buffered_in_memory |
| headers = {} |
| headers.update(self.base_headers) |
| headers["etag"] = "577ef1154f3240ad5b9b413aa7346a1e" |
| body = generate_random_data(1000) |
| return (httplib.OK, body, headers, httplib.responses[httplib.OK]) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |