| # -*- coding=utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| from __future__ import unicode_literals |
| |
| import os |
| import sys |
| import unittest |
| |
| try: |
| import mock |
| except ImportError: |
| from unittest import mock |
| |
| from libcloud.utils.py3 import b |
| from libcloud.utils.py3 import httplib |
| from libcloud.utils.py3 import urlparse |
| from libcloud.utils.py3 import parse_qs |
| from libcloud.common.types import InvalidCredsError |
| from libcloud.storage.base import Container, Object |
| from libcloud.storage.types import ContainerDoesNotExistError |
| from libcloud.storage.types import ContainerError |
| from libcloud.storage.types import ContainerIsNotEmptyError |
| from libcloud.storage.types import InvalidContainerNameError |
| from libcloud.storage.types import ObjectDoesNotExistError |
| from libcloud.storage.types import ObjectHashMismatchError |
| from libcloud.storage.drivers.oss import OSSConnection |
| from libcloud.storage.drivers.oss import OSSStorageDriver |
| from libcloud.storage.drivers.oss import CHUNK_SIZE |
| from libcloud.storage.drivers.dummy import DummyIterator |
| from libcloud.test import MockHttp, generate_random_data, make_response # pylint: disable-msg=E0611 |
| from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611 |
| from libcloud.test.secrets import STORAGE_OSS_PARAMS |
| |
| |
| class OSSConnectionTestCase(unittest.TestCase): |
| def setUp(self): |
| self.conn = OSSConnection('44CF9590006BF252F707', |
| 'OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV') |
| |
| def test_signature(self): |
| expected = b('26NBxoKdsyly4EDv6inkoDft/yA=') |
| headers = { |
| 'Content-MD5': 'ODBGOERFMDMzQTczRUY3NUE3NzA5QzdFNUYzMDQxNEM=', |
| 'Content-Type': 'text/html', |
| 'Expires': 'Thu, 17 Nov 2005 18:49:58 GMT', |
| 'X-OSS-Meta-Author': 'foo@bar.com', |
| 'X-OSS-Magic': 'abracadabra', |
| 'Host': 'oss-example.oss-cn-hangzhou.aliyuncs.com' |
| } |
| action = '/oss-example/nelson' |
| actual = OSSConnection._get_auth_signature('PUT', headers, {}, |
| headers['Expires'], |
| self.conn.key, |
| action, |
| 'x-oss-') |
| self.assertEqual(expected, actual) |
| |
| |
| class ObjectTestCase(unittest.TestCase): |
| def test_object_with_chinese_name(self): |
| driver = OSSStorageDriver(*STORAGE_OSS_PARAMS) |
| obj = Object(name='中文', size=0, hash=None, extra=None, |
| meta_data=None, container=None, driver=driver) |
| self.assertTrue(obj.__repr__() is not None) |
| |
| |
| class OSSMockHttp(MockHttp, unittest.TestCase): |
| |
| fixtures = StorageFileFixtures('oss') |
| base_headers = {} |
| |
| def _unauthorized(self, method, url, body, headers): |
| return (httplib.UNAUTHORIZED, |
| '', |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_containers_empty(self, method, url, body, headers): |
| body = self.fixtures.load('list_containers_empty.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_containers(self, method, url, body, headers): |
| body = self.fixtures.load('list_containers.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_container_objects_empty(self, method, url, body, headers): |
| body = self.fixtures.load('list_container_objects_empty.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_container_objects(self, method, url, body, headers): |
| body = self.fixtures.load('list_container_objects.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_container_objects_chinese(self, method, url, body, headers): |
| body = self.fixtures.load('list_container_objects_chinese.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_container_objects_prefix(self, method, url, body, headers): |
| params = {'prefix': self.test.prefix} |
| self.assertUrlContainsQueryParams(url, params) |
| body = self.fixtures.load('list_container_objects_prefix.xml') |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _get_container(self, method, url, body, headers): |
| return self._list_containers(method, url, body, headers) |
| |
| def _get_object(self, method, url, body, headers): |
| return self._list_containers(method, url, body, headers) |
| |
| def _notexisted_get_object(self, method, url, body, headers): |
| return (httplib.NOT_FOUND, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.NOT_FOUND]) |
| |
| def _test_get_object(self, method, url, body, headers): |
| self.base_headers.update( |
| {'accept-ranges': 'bytes', |
| 'connection': 'keep-alive', |
| 'content-length': '0', |
| 'content-type': 'application/octet-stream', |
| 'date': 'Sat, 16 Jan 2016 15:38:14 GMT', |
| 'etag': '"D41D8CD98F00B204E9800998ECF8427E"', |
| 'last-modified': 'Fri, 15 Jan 2016 14:43:15 GMT', |
| 'server': 'AliyunOSS', |
| 'x-oss-object-type': 'Normal', |
| 'x-oss-request-id': '569A63E6257784731E3D877F', |
| 'x-oss-meta-rabbits': 'monkeys'}) |
| |
| return (httplib.OK, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.OK]) |
| |
| def _invalid_name(self, method, url, body, headers): |
| # test_create_container_bad_request |
| return (httplib.BAD_REQUEST, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _already_exists(self, method, url, body, headers): |
| # test_create_container_already_existed |
| return (httplib.CONFLICT, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _create_container(self, method, url, body, headers): |
| # test_create_container_success |
| self.assertEqual('PUT', method) |
| self.assertEqual('', body) |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _create_container_location(self, method, url, body, headers): |
| # test_create_container_success |
| self.assertEqual('PUT', method) |
| location_constraint = ('<CreateBucketConfiguration>' |
| '<LocationConstraint>%s</LocationConstraint>' |
| '</CreateBucketConfiguration>' % |
| self.test.ex_location) |
| self.assertEqual(location_constraint, body) |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _delete_container_doesnt_exist(self, method, url, body, headers): |
| # test_delete_container_doesnt_exist |
| return (httplib.NOT_FOUND, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _delete_container_not_empty(self, method, url, body, headers): |
| # test_delete_container_not_empty |
| return (httplib.CONFLICT, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _delete_container(self, method, url, body, headers): |
| return (httplib.NO_CONTENT, |
| body, |
| self.base_headers, |
| httplib.responses[httplib.NO_CONTENT]) |
| |
| def _foo_bar_object_not_found(self, method, url, body, headers): |
| # test_delete_object_not_found |
| return (httplib.NOT_FOUND, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _foo_bar_object_delete(self, method, url, body, headers): |
| # test_delete_object |
| return (httplib.NO_CONTENT, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _list_multipart(self, method, url, body, headers): |
| query_string = urlparse.urlsplit(url).query |
| query = parse_qs(query_string) |
| |
| if 'key-marker' not in query: |
| body = self.fixtures.load('ex_iterate_multipart_uploads_p1.xml') |
| else: |
| body = self.fixtures.load('ex_iterate_multipart_uploads_p2.xml') |
| |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _foo_bar_object(self, method, url, body, headers): |
| # test_download_object_success |
| body = generate_random_data(1000) |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _foo_bar_object_invalid_size(self, method, url, body, headers): |
| # test_upload_object_invalid_file_size |
| body = '' |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| def _foo_test_stream_data_multipart(self, method, url, body, headers): |
| headers = {} |
| body = '' |
| headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'} |
| return (httplib.OK, |
| body, |
| headers, |
| httplib.responses[httplib.OK]) |
| |
| |
| class OSSStorageDriverTestCase(unittest.TestCase): |
| driver_type = OSSStorageDriver |
| driver_args = STORAGE_OSS_PARAMS |
| mock_response_klass = OSSMockHttp |
| |
| @classmethod |
| def create_driver(self): |
| return self.driver_type(*self.driver_args) |
| |
| def setUp(self): |
| self.driver_type.connectionCls.conn_class = self.mock_response_klass |
| self.mock_response_klass.type = None |
| self.mock_response_klass.test = self |
| self.driver = self.create_driver() |
| |
| def tearDown(self): |
| self._remove_test_file() |
| |
| def _remove_test_file(self): |
| file_path = os.path.abspath(__file__) + '.temp' |
| |
| try: |
| os.unlink(file_path) |
| except OSError: |
| pass |
| |
| def test_invalid_credentials(self): |
| self.mock_response_klass.type = 'unauthorized' |
| self.assertRaises(InvalidCredsError, self.driver.list_containers) |
| |
| def test_list_containers_empty(self): |
| self.mock_response_klass.type = 'list_containers_empty' |
| containers = self.driver.list_containers() |
| self.assertEqual(len(containers), 0) |
| |
| def test_list_containers_success(self): |
| self.mock_response_klass.type = 'list_containers' |
| containers = self.driver.list_containers() |
| self.assertEqual(len(containers), 2) |
| |
| container = containers[0] |
| self.assertEqual('xz02tphky6fjfiuc0', container.name) |
| self.assertTrue('creation_date' in container.extra) |
| self.assertEqual('2014-05-15T11:18:32.000Z', |
| container.extra['creation_date']) |
| self.assertTrue('location' in container.extra) |
| self.assertEqual('oss-cn-hangzhou-a', container.extra['location']) |
| self.assertEqual(self.driver, container.driver) |
| |
| def test_list_container_objects_empty(self): |
| self.mock_response_klass.type = 'list_container_objects_empty' |
| container = Container(name='test_container', extra={}, |
| driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 0) |
| |
| def test_list_container_objects_success(self): |
| self.mock_response_klass.type = 'list_container_objects' |
| container = Container(name='test_container', extra={}, |
| driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 2) |
| |
| obj = objects[0] |
| self.assertEqual(obj.name, 'en/') |
| self.assertEqual(obj.hash, 'D41D8CD98F00B204E9800998ECF8427E') |
| self.assertEqual(obj.size, 0) |
| self.assertEqual(obj.container.name, 'test_container') |
| self.assertEqual( |
| obj.extra['last_modified'], '2016-01-15T14:43:15.000Z') |
| self.assertTrue('owner' in obj.meta_data) |
| |
| def test_list_container_objects_with_chinese(self): |
| self.mock_response_klass.type = 'list_container_objects_chinese' |
| container = Container(name='test_container', extra={}, |
| driver=self.driver) |
| objects = self.driver.list_container_objects(container=container) |
| self.assertEqual(len(objects), 2) |
| |
| obj = [o for o in objects |
| if o.name == 'WEB控制台.odp'][0] |
| self.assertEqual(obj.hash, '281371EA1618CF0E645D6BB90A158276') |
| self.assertEqual(obj.size, 1234567) |
| self.assertEqual(obj.container.name, 'test_container') |
| self.assertEqual( |
| obj.extra['last_modified'], '2016-01-15T14:43:06.000Z') |
| self.assertTrue('owner' in obj.meta_data) |
| |
| def test_list_container_objects_with_prefix(self): |
| self.mock_response_klass.type = 'list_container_objects_prefix' |
| container = Container(name='test_container', extra={}, |
| driver=self.driver) |
| self.prefix = 'test_prefix' |
| objects = self.driver.list_container_objects(container=container, |
| prefix=self.prefix) |
| self.assertEqual(len(objects), 2) |
| |
| def test_get_container_doesnt_exist(self): |
| self.mock_response_klass.type = 'get_container' |
| self.assertRaises(ContainerDoesNotExistError, |
| self.driver.get_container, |
| container_name='not-existed') |
| |
| def test_get_container_success(self): |
| self.mock_response_klass.type = 'get_container' |
| container = self.driver.get_container( |
| container_name='xz02tphky6fjfiuc0') |
| self.assertTrue(container.name, 'xz02tphky6fjfiuc0') |
| |
| def test_get_object_container_doesnt_exist(self): |
| self.mock_response_klass.type = 'get_object' |
| self.assertRaises(ObjectDoesNotExistError, |
| self.driver.get_object, |
| container_name='xz02tphky6fjfiuc0', |
| object_name='notexisted') |
| |
| def test_get_object_success(self): |
| self.mock_response_klass.type = 'get_object' |
| obj = self.driver.get_object(container_name='xz02tphky6fjfiuc0', |
| object_name='test') |
| |
| self.assertEqual(obj.name, 'test') |
| self.assertEqual(obj.container.name, 'xz02tphky6fjfiuc0') |
| self.assertEqual(obj.size, 0) |
| self.assertEqual(obj.hash, 'D41D8CD98F00B204E9800998ECF8427E') |
| self.assertEqual(obj.extra['last_modified'], |
| 'Fri, 15 Jan 2016 14:43:15 GMT') |
| self.assertEqual(obj.extra['content_type'], 'application/octet-stream') |
| self.assertEqual(obj.meta_data['rabbits'], 'monkeys') |
| |
| def test_create_container_bad_request(self): |
| # invalid container name, returns a 400 bad request |
| self.mock_response_klass.type = 'invalid_name' |
| self.assertRaises(ContainerError, |
| self.driver.create_container, |
| container_name='invalid_name') |
| |
| def test_create_container_already_exists(self): |
| # container with this name already exists |
| self.mock_response_klass.type = 'already_exists' |
| self.assertRaises(InvalidContainerNameError, |
| self.driver.create_container, |
| container_name='new-container') |
| |
| def test_create_container_success(self): |
| # success |
| self.mock_response_klass.type = 'create_container' |
| name = 'new_container' |
| container = self.driver.create_container(container_name=name) |
| self.assertEqual(container.name, name) |
| |
| def test_create_container_with_ex_location(self): |
| self.mock_response_klass.type = 'create_container_location' |
| name = 'new_container' |
| self.ex_location = 'oss-cn-beijing' |
| container = self.driver.create_container(container_name=name, |
| ex_location=self.ex_location) |
| self.assertEqual(container.name, name) |
| self.assertTrue(container.extra['location'], self.ex_location) |
| |
| def test_delete_container_doesnt_exist(self): |
| container = Container(name='new_container', extra=None, |
| driver=self.driver) |
| self.mock_response_klass.type = 'delete_container_doesnt_exist' |
| self.assertRaises(ContainerDoesNotExistError, |
| self.driver.delete_container, |
| container=container) |
| |
| def test_delete_container_not_empty(self): |
| container = Container(name='new_container', extra=None, |
| driver=self.driver) |
| self.mock_response_klass.type = 'delete_container_not_empty' |
| self.assertRaises(ContainerIsNotEmptyError, |
| self.driver.delete_container, |
| container=container) |
| |
| def test_delete_container_success(self): |
| self.mock_response_klass.type = 'delete_container' |
| container = Container(name='new_container', extra=None, |
| driver=self.driver) |
| self.assertTrue(self.driver.delete_container(container=container)) |
| |
| def test_download_object_success(self): |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, |
| container=container, meta_data=None, |
| driver=self.driver_type) |
| destination_path = os.path.abspath(__file__) + '.temp' |
| result = self.driver.download_object(obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True) |
| self.assertTrue(result) |
| |
| def test_download_object_invalid_file_size(self): |
| self.mock_response_klass.type = 'invalid_size' |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, |
| container=container, meta_data=None, |
| driver=self.driver_type) |
| destination_path = os.path.abspath(__file__) + '.temp' |
| result = self.driver.download_object(obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True) |
| self.assertFalse(result) |
| |
| def test_download_object_not_found(self): |
| self.mock_response_klass.type = 'not_found' |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, |
| container=container, meta_data=None, |
| driver=self.driver_type) |
| destination_path = os.path.abspath(__file__) + '.temp' |
| self.assertRaises(ObjectDoesNotExistError, |
| self.driver.download_object, |
| obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=False, |
| delete_on_failure=True) |
| |
| def test_download_object_as_stream_success(self): |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| |
| obj = Object(name='foo_bar_object', size=1000, hash=None, extra={}, |
| container=container, meta_data=None, |
| driver=self.driver_type) |
| |
| stream = self.driver.download_object_as_stream(obj=obj, |
| chunk_size=None) |
| self.assertTrue(hasattr(stream, '__iter__')) |
| |
| def test_upload_object_invalid_hash1(self): |
| def upload_file(self, object_name=None, content_type=None, |
| request_path=None, request_method=None, |
| headers=None, file_path=None, stream=None): |
| return {'response': make_response(200, headers={'etag': '2345'}), |
| 'bytes_transferred': 1000, |
| 'data_hash': 'hash343hhash89h932439jsaa89'} |
| |
| self.mock_response_klass.type = 'INVALID_HASH1' |
| |
| old_func = self.driver_type._upload_object |
| self.driver_type._upload_object = upload_file |
| file_path = os.path.abspath(__file__) |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_upload' |
| try: |
| self.driver.upload_object(file_path=file_path, container=container, |
| object_name=object_name, |
| verify_hash=True) |
| except ObjectHashMismatchError: |
| pass |
| else: |
| self.fail( |
| 'Invalid hash was returned but an exception was not thrown') |
| finally: |
| self.driver_type._upload_object = old_func |
| |
| def test_upload_object_success(self): |
| def upload_file(self, object_name=None, content_type=None, |
| request_path=None, request_method=None, |
| headers=None, file_path=None, stream=None): |
| return {'response': make_response(200, |
| headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), |
| 'bytes_transferred': 1000, |
| 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} |
| self.mock_response_klass.type = None |
| old_func = self.driver_type._upload_object |
| self.driver_type._upload_object = upload_file |
| file_path = os.path.abspath(__file__) |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_upload' |
| extra = {'meta_data': {'some-value': 'foobar'}} |
| obj = self.driver.upload_object(file_path=file_path, |
| container=container, |
| object_name=object_name, |
| extra=extra, |
| verify_hash=True) |
| self.assertEqual(obj.name, 'foo_test_upload') |
| self.assertEqual(obj.size, 1000) |
| self.assertTrue('some-value' in obj.meta_data) |
| self.driver_type._upload_object = old_func |
| |
| def test_upload_object_with_acl(self): |
| def upload_file(self, object_name=None, content_type=None, |
| request_path=None, request_method=None, |
| headers=None, file_path=None, stream=None): |
| return {'response': make_response(200, headers={'etag': '0cc175b9c0f1b6a831c399e269772661'}), |
| 'bytes_transferred': 1000, |
| 'data_hash': '0cc175b9c0f1b6a831c399e269772661'} |
| |
| self.mock_response_klass.type = None |
| old_func = self.driver_type._upload_object |
| self.driver_type._upload_object = upload_file |
| |
| file_path = os.path.abspath(__file__) |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_upload' |
| extra = {'acl': 'public-read'} |
| obj = self.driver.upload_object(file_path=file_path, |
| container=container, |
| object_name=object_name, |
| extra=extra, |
| verify_hash=True) |
| self.assertEqual(obj.name, 'foo_test_upload') |
| self.assertEqual(obj.size, 1000) |
| self.assertEqual(obj.extra['acl'], 'public-read') |
| self.driver_type._upload_object = old_func |
| |
| def test_upload_object_with_invalid_acl(self): |
| file_path = os.path.abspath(__file__) |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_upload' |
| extra = {'acl': 'invalid-acl'} |
| self.assertRaises(AttributeError, |
| self.driver.upload_object, |
| file_path=file_path, |
| container=container, |
| object_name=object_name, |
| extra=extra, |
| verify_hash=True) |
| |
| def test_upload_empty_object_via_stream(self): |
| if self.driver.supports_multipart_upload: |
| self.mock_response_klass.type = 'multipart' |
| else: |
| self.mock_response_klass.type = None |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_stream_data' |
| iterator = DummyIterator(data=['']) |
| extra = {'content_type': 'text/plain'} |
| obj = self.driver.upload_object_via_stream(container=container, |
| object_name=object_name, |
| iterator=iterator, |
| extra=extra) |
| |
| self.assertEqual(obj.name, object_name) |
| self.assertEqual(obj.size, 0) |
| |
| def test_upload_small_object_via_stream(self): |
| if self.driver.supports_multipart_upload: |
| self.mock_response_klass.type = 'multipart' |
| else: |
| self.mock_response_klass.type = None |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_stream_data' |
| iterator = DummyIterator(data=['2', '3', '5']) |
| extra = {'content_type': 'text/plain'} |
| obj = self.driver.upload_object_via_stream(container=container, |
| object_name=object_name, |
| iterator=iterator, |
| extra=extra) |
| |
| self.assertEqual(obj.name, object_name) |
| self.assertEqual(obj.size, 3) |
| |
| def test_upload_big_object_via_stream(self): |
| if self.driver.supports_multipart_upload: |
| self.mock_response_klass.type = 'multipart' |
| else: |
| self.mock_response_klass.type = None |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_stream_data' |
| iterator = DummyIterator( |
| data=['2' * CHUNK_SIZE, '3' * CHUNK_SIZE, '5']) |
| extra = {'content_type': 'text/plain'} |
| obj = self.driver.upload_object_via_stream(container=container, |
| object_name=object_name, |
| iterator=iterator, |
| extra=extra) |
| |
| self.assertEqual(obj.name, object_name) |
| self.assertEqual(obj.size, CHUNK_SIZE * 2 + 1) |
| |
| def test_upload_object_via_stream_abort(self): |
| if not self.driver.supports_multipart_upload: |
| return |
| |
| self.mock_response_klass.type = 'MULTIPART' |
| |
| def _faulty_iterator(): |
| for i in range(0, 5): |
| yield str(i) |
| raise RuntimeError('Error in fetching data') |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| object_name = 'foo_test_stream_data' |
| iterator = _faulty_iterator() |
| extra = {'content_type': 'text/plain'} |
| |
| try: |
| self.driver.upload_object_via_stream(container=container, |
| object_name=object_name, |
| iterator=iterator, |
| extra=extra) |
| except Exception: |
| pass |
| |
| return |
| |
| def test_ex_iterate_multipart_uploads(self): |
| if not self.driver.supports_multipart_upload: |
| return |
| |
| self.mock_response_klass.type = 'list_multipart' |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| |
| for upload in self.driver.ex_iterate_multipart_uploads(container, |
| max_uploads=2): |
| self.assertTrue(upload.key is not None) |
| self.assertTrue(upload.id is not None) |
| self.assertTrue(upload.initiated is not None) |
| |
| def test_ex_abort_all_multipart_uploads(self): |
| if not self.driver.supports_multipart_upload: |
| return |
| |
| self.mock_response_klass.type = 'list_multipart' |
| |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| |
| with mock.patch('libcloud.storage.drivers.oss.OSSStorageDriver' |
| '._abort_multipart', autospec=True) as mock_abort: |
| self.driver.ex_abort_all_multipart_uploads(container) |
| |
| self.assertEqual(3, mock_abort.call_count) |
| |
| def test_delete_object_not_found(self): |
| self.mock_response_klass.type = 'not_found' |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None, |
| meta_data=None, container=container, driver=self.driver) |
| self.assertRaises(ObjectDoesNotExistError, |
| self.driver.delete_object, |
| obj=obj) |
| |
| def test_delete_object_success(self): |
| self.mock_response_klass.type = 'delete' |
| container = Container(name='foo_bar_container', extra={}, |
| driver=self.driver) |
| obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None, |
| meta_data=None, container=container, driver=self.driver) |
| |
| result = self.driver.delete_object(obj=obj) |
| self.assertTrue(result) |
| |
| |
| if __name__ == '__main__': |
| sys.exit(unittest.main()) |