blob: da1d99c06dc8bc110e50b22cd509e455dbacbee8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import unittest
from libcloud.utils.py3 import httplib
from libcloud.common.types import InvalidCredsError
from libcloud.common.types import LibcloudError
from libcloud.storage.base import Container, Object
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.types import ObjectHashMismatchError
from libcloud.storage.drivers.s3 import S3StorageDriver, S3USWestStorageDriver
from libcloud.storage.drivers.s3 import S3EUWestStorageDriver
from libcloud.storage.drivers.s3 import S3APSEStorageDriver
from libcloud.storage.drivers.s3 import S3APNEStorageDriver
from libcloud.storage.drivers.dummy import DummyIterator
from test import StorageMockHttp, MockRawResponse # pylint: disable-msg=E0611
from test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
from test.secrets import STORAGE_S3_PARAMS
class S3MockHttp(StorageMockHttp):
fixtures = StorageFileFixtures('s3')
base_headers = {}
def _UNAUTHORIZED(self, method, url, body, headers):
return (httplib.UNAUTHORIZED,
'',
self.base_headers,
httplib.responses[httplib.OK])
def _DIFFERENT_REGION(self, method, url, body, headers):
return (httplib.MOVED_PERMANENTLY,
'',
self.base_headers,
httplib.responses[httplib.OK])
def _list_containers_EMPTY(self, method, url, body, headers):
body = self.fixtures.load('list_containers_empty.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _list_containers(self, method, url, body, headers):
body = self.fixtures.load('list_containers.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test_container_EMPTY(self, method, url, body, headers):
body = self.fixtures.load('list_container_objects_empty.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test_container(self, method, url, body, headers):
body = self.fixtures.load('list_container_objects.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test_container_ITERATOR(self, method, url, body, headers):
if url.find('3.zip') == -1:
# First part of the response (first 3 objects)
file_name = 'list_container_objects_not_exhausted1.xml'
else:
file_name = 'list_container_objects_not_exhausted2.xml'
body = self.fixtures.load(file_name)
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test2_test_list_containers(self, method, url, body, headers):
# test_get_object
body = self.fixtures.load('list_containers.xml')
headers = {'content-type': 'application/zip',
'etag': '"e31208wqsdoj329jd"',
'content-length': 12345,
}
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _new_container_INVALID_NAME(self, method, url, body, headers):
# test_create_container
return (httplib.BAD_REQUEST,
body,
headers,
httplib.responses[httplib.OK])
def _new_container_ALREADY_EXISTS(self, method, url, body, headers):
# test_create_container
return (httplib.CONFLICT,
body,
headers,
httplib.responses[httplib.OK])
def _new_container(self, method, url, body, headers):
# test_create_container, test_delete_container
if method == 'PUT':
status = httplib.OK
elif method == 'DELETE':
status = httplib.NO_CONTENT
return (status,
body,
headers,
httplib.responses[httplib.OK])
def _new_container_DOESNT_EXIST(self, method, url, body, headers):
# test_delete_container
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.OK])
def _new_container_NOT_EMPTY(self, method, url, body, headers):
# test_delete_container
return (httplib.CONFLICT,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container(self, method, url, body, headers):
# test_delete_container
return (httplib.NO_CONTENT,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_NOT_FOUND(self, method, url, body, headers):
# test_delete_container_not_found
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_NOT_FOUND(self, method, url, body,
headers):
# test_delete_object_not_found
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
# test_delete_object
return (httplib.NO_CONTENT,
body,
headers,
httplib.responses[httplib.OK])
class S3MockRawResponse(MockRawResponse):
fixtures = StorageFileFixtures('s3')
def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
# test_download_object_success
body = ''
self._data = self._generate_random_data(1000)
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_upload_INVALID_HASH1(self, method, url,
body, headers):
body = ''
headers = {}
headers['etag'] = '"foobar"'
# test_upload_object_invalid_hash1
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_upload_INVALID_HASH2(self, method, url,
body, headers):
# test_upload_object_invalid_hash2
body = ''
headers = {'etag': '"hash343hhash89h932439jsaa89"'}
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_upload(self, method, url, body, headers):
# test_upload_object_success
body = ''
headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_INVALID_SIZE(self, method, url,
body, headers):
# test_upload_object_invalid_file_size
body = ''
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_stream_data(self, method, url, body,
headers):
# test_upload_object_via_stream
body = ''
headers = {'etag': '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
class S3Tests(unittest.TestCase):
driver_type = S3StorageDriver
driver_args = STORAGE_S3_PARAMS
mock_response_klass = S3MockHttp
mock_raw_response_klass = S3MockRawResponse
@classmethod
def create_driver(self):
return self.driver_type(*self.driver_args)
def setUp(self):
self.driver_type.connectionCls.conn_classes = (None,
self.mock_response_klass)
self.driver_type.connectionCls.rawResponseCls = \
self.mock_raw_response_klass
self.mock_response_klass.type = None
self.mock_raw_response_klass.type = None
self.driver = self.create_driver()
def tearDown(self):
self._remove_test_file()
def _remove_test_file(self):
file_path = os.path.abspath(__file__) + '.temp'
try:
os.unlink(file_path)
except OSError:
pass
def test_invalid_credentials(self):
self.mock_response_klass.type = 'UNAUTHORIZED'
try:
self.driver.list_containers()
except InvalidCredsError:
e = sys.exc_info()[1]
self.assertEqual(True, isinstance(e, InvalidCredsError))
else:
self.fail('Exception was not thrown')
def test_bucket_is_located_in_different_region(self):
self.mock_response_klass.type = 'DIFFERENT_REGION'
try:
self.driver.list_containers()
except LibcloudError:
pass
else:
self.fail('Exception was not thrown')
def test_list_containers_empty(self):
self.mock_response_klass.type = 'list_containers_EMPTY'
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
def test_list_containers_success(self):
self.mock_response_klass.type = 'list_containers'
containers = self.driver.list_containers()
self.assertEqual(len(containers), 2)
self.assertTrue('creation_date' in containers[1].extra)
def test_list_container_objects_empty(self):
self.mock_response_klass.type = 'EMPTY'
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
def test_list_container_objects_success(self):
self.mock_response_klass.type = None
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 1)
obj = [o for o in objects if o.name == '1.zip'][0]
self.assertEqual(obj.hash, '4397da7a7649e8085de9916c240e8166')
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, 'test_container')
self.assertTrue('owner' in obj.meta_data)
def test_list_container_objects_iterator_has_more(self):
self.mock_response_klass.type = 'ITERATOR'
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container)
obj = [o for o in objects if o.name == '1.zip'][0]
self.assertEqual(obj.hash, '4397da7a7649e8085de9916c240e8166')
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, 'test_container')
self.assertTrue(obj in objects)
self.assertEqual(len(objects), 5)
def test_get_container_doesnt_exist(self):
self.mock_response_klass.type = 'list_containers'
try:
self.driver.get_container(container_name='container1')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_container_success(self):
self.mock_response_klass.type = 'list_containers'
container = self.driver.get_container(container_name='test1')
self.assertTrue(container.name, 'test1')
def test_get_object_container_doesnt_exist(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = 'list_containers'
try:
self.driver.get_object(container_name='test-inexistent',
object_name='test')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_object_success(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = 'list_containers'
obj = self.driver.get_object(container_name='test2',
object_name='test')
self.assertEqual(obj.name, 'test')
self.assertEqual(obj.container.name, 'test2')
self.assertEqual(obj.size, 12345)
self.assertEqual(obj.hash, 'e31208wqsdoj329jd')
def test_create_container_invalid_name(self):
# invalid container name
self.mock_response_klass.type = 'INVALID_NAME'
try:
self.driver.create_container(container_name='new_container')
except InvalidContainerNameError:
pass
else:
self.fail('Exception was not thrown')
def test_create_container_already_exists(self):
# container with this name already exists
self.mock_response_klass.type = 'ALREADY_EXISTS'
try:
self.driver.create_container(container_name='new-container')
except InvalidContainerNameError:
pass
else:
self.fail('Exception was not thrown')
def test_create_container_success(self):
# success
self.mock_response_klass.type = None
name = 'new_container'
container = self.driver.create_container(container_name=name)
self.assertEqual(container.name, name)
def test_delete_container_doesnt_exist(self):
container = Container(name='new_container', extra=None,
driver=self.driver)
self.mock_response_klass.type = 'DOESNT_EXIST'
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_container_not_empty(self):
container = Container(name='new_container', extra=None,
driver=self.driver)
self.mock_response_klass.type = 'NOT_EMPTY'
try:
self.driver.delete_container(container=container)
except ContainerIsNotEmptyError:
pass
else:
self.fail('Exception was not thrown')
# success
self.mock_response_klass.type = None
self.assertTrue(self.driver.delete_container(container=container))
def test_delete_container_not_found(self):
self.mock_response_klass.type = 'NOT_FOUND'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Container does not exist but an exception was not' +
'thrown')
def test_delete_container_success(self):
self.mock_response_klass.type = None
container = Container(name='new_container', extra=None,
driver=self.driver)
self.assertTrue(self.driver.delete_container(container=container))
def test_download_object_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=S3StorageDriver)
destination_path = os.path.abspath(__file__) + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertTrue(result)
def test_download_object_invalid_file_size(self):
self.mock_raw_response_klass.type = 'INVALID_SIZE'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=S3StorageDriver)
destination_path = os.path.abspath(__file__) + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertFalse(result)
def test_download_object_invalid_file_already_exists(self):
self.mock_raw_response_klass.type = 'INVALID_SIZE'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=S3StorageDriver)
destination_path = os.path.abspath(__file__)
try:
self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
except LibcloudError:
pass
else:
self.fail('Exception was not thrown')
def test_download_object_as_stream_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=S3StorageDriver)
stream = self.driver.download_object_as_stream(obj=obj,
chunk_size=None)
self.assertTrue(hasattr(stream, '__iter__'))
def test_upload_object_invalid_ex_storage_class(self):
# Invalid hash is detected on the amazon side and BAD_REQUEST is
# returned
file_path = os.path.abspath(__file__)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
try:
self.driver.upload_object(file_path=file_path, container=container,
object_name=object_name,
verify_hash=True,
ex_storage_class='invalid-class')
except ValueError:
e = sys.exc_info()[1]
self.assertTrue(str(e).lower().find('invalid storage class') != -1)
else:
self.fail('Exception was not thrown')
def test_upload_object_invalid_hash1(self):
# Invalid hash is detected on the amazon side and BAD_REQUEST is
# returned
def upload_file(self, response, file_path, chunked=False,
calculate_hash=True):
return True, 'hash343hhash89h932439jsaa89', 1000
self.mock_raw_response_klass.type = 'INVALID_HASH1'
old_func = S3StorageDriver._upload_file
S3StorageDriver._upload_file = upload_file
file_path = os.path.abspath(__file__)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
try:
self.driver.upload_object(file_path=file_path, container=container,
object_name=object_name,
verify_hash=True)
except ObjectHashMismatchError:
pass
else:
self.fail(
'Invalid hash was returned but an exception was not thrown')
finally:
S3StorageDriver._upload_file = old_func
def test_upload_object_invalid_hash2(self):
# Invalid hash is detected when comparing hash provided in the response
# ETag header
def upload_file(self, response, file_path, chunked=False,
calculate_hash=True):
return True, '0cc175b9c0f1b6a831c399e269772661', 1000
self.mock_raw_response_klass.type = 'INVALID_HASH2'
old_func = S3StorageDriver._upload_file
S3StorageDriver._upload_file = upload_file
file_path = os.path.abspath(__file__)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
try:
self.driver.upload_object(file_path=file_path, container=container,
object_name=object_name,
verify_hash=True)
except ObjectHashMismatchError:
pass
else:
self.fail(
'Invalid hash was returned but an exception was not thrown')
finally:
S3StorageDriver._upload_file = old_func
def test_upload_object_success(self):
def upload_file(self, response, file_path, chunked=False,
calculate_hash=True):
return True, '0cc175b9c0f1b6a831c399e269772661', 1000
old_func = S3StorageDriver._upload_file
S3StorageDriver._upload_file = upload_file
file_path = os.path.abspath(__file__)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
extra = {'meta_data': {'some-value': 'foobar'}}
obj = self.driver.upload_object(file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True)
self.assertEqual(obj.name, 'foo_test_upload')
self.assertEqual(obj.size, 1000)
self.assertTrue('some-value' in obj.meta_data)
S3StorageDriver._upload_file = old_func
def test_upload_object_via_stream(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_stream_data'
iterator = DummyIterator(data=['2', '3', '5'])
extra = {'content_type': 'text/plain'}
obj = self.driver.upload_object_via_stream(container=container,
object_name=object_name,
iterator=iterator,
extra=extra)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
def test_delete_object_not_found(self):
self.mock_response_klass.type = 'NOT_FOUND'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None,
meta_data=None, container=container, driver=self.driver)
try:
self.driver.delete_object(obj=obj)
except ObjectDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_object_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None,
meta_data=None, container=container, driver=self.driver)
result = self.driver.delete_object(obj=obj)
self.assertTrue(result)
class S3USWestTests(S3Tests):
driver_type = S3USWestStorageDriver
class S3EUWestTests(S3Tests):
driver_type = S3EUWestStorageDriver
class S3APSETests(S3Tests):
driver_type = S3APSEStorageDriver
class S3APNETests(S3Tests):
driver_tyoe = S3APNEStorageDriver
if __name__ == '__main__':
sys.exit(unittest.main())