blob: 19dfceefa6b6eef12aa6ff8b6b3061025485ffee [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import os
import sys
import tempfile
from io import BytesIO
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlparse
from libcloud.utils.py3 import parse_qs
from libcloud.utils.py3 import b
from libcloud.utils.py3 import basestring
from libcloud.common.types import InvalidCredsError
from libcloud.common.types import LibcloudError
from libcloud.storage.base import Container, Object
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import ContainerAlreadyExistsError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.types import ObjectHashMismatchError
from libcloud.storage.drivers.azure_blobs import AzureBlobsStorageDriver
from libcloud.storage.drivers.azure_blobs import AZURE_UPLOAD_CHUNK_SIZE
from libcloud.test import unittest
from libcloud.test import generate_random_data # pylint: disable-msg=E0611
from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
from libcloud.test.secrets import STORAGE_AZURE_BLOBS_PARAMS
from libcloud.test.secrets import STORAGE_AZURITE_BLOBS_PARAMS
from libcloud.test.storage.base import BaseRangeDownloadMockHttp
class AzureBlobsMockHttp(BaseRangeDownloadMockHttp, unittest.TestCase):
fixtures = StorageFileFixtures('azure_blobs')
base_headers = {}
def _UNAUTHORIZED(self, method, url, body, headers):
return (httplib.UNAUTHORIZED,
'',
self.base_headers,
httplib.responses[httplib.UNAUTHORIZED])
def _list_containers_EMPTY(self, method, url, body, headers):
body = self.fixtures.load('list_containers_empty.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _list_containers(self, method, url, body, headers):
query_string = urlparse.urlsplit(url).query
query = parse_qs(query_string)
if 'marker' not in query:
body = self.fixtures.load('list_containers_1.xml')
else:
body = self.fixtures.load('list_containers_2.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test_container_EMPTY(self, method, url, body, headers):
if method == 'DELETE':
body = u''
return (httplib.ACCEPTED,
body,
self.base_headers,
httplib.responses[httplib.ACCEPTED])
else:
body = self.fixtures.load('list_objects_empty.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _new__container_INVALID_NAME(self, method, url, body, headers):
return (httplib.BAD_REQUEST,
body,
self.base_headers,
httplib.responses[httplib.BAD_REQUEST])
def _test_container(self, method, url, body, headers):
query_string = urlparse.urlsplit(url).query
query = parse_qs(query_string)
if 'marker' not in query:
body = self.fixtures.load('list_objects_1.xml')
else:
body = self.fixtures.load('list_objects_2.xml')
return (httplib.OK,
body,
self.base_headers,
httplib.responses[httplib.OK])
def _test_container100(self, method, url, body, headers):
body = ''
if method != 'HEAD':
return (httplib.BAD_REQUEST,
body,
self.base_headers,
httplib.responses[httplib.BAD_REQUEST])
return (httplib.NOT_FOUND,
body,
self.base_headers,
httplib.responses[httplib.NOT_FOUND])
def _test_container200(self, method, url, body, headers):
body = ''
if method != 'HEAD':
return (httplib.BAD_REQUEST,
body,
self.base_headers,
httplib.responses[httplib.BAD_REQUEST])
headers = {}
headers['etag'] = '0x8CFB877BB56A6FB'
headers['last-modified'] = 'Fri, 04 Jan 2013 09:48:06 GMT'
headers['x-ms-lease-status'] = 'unlocked'
headers['x-ms-lease-state'] = 'available'
headers['x-ms-meta-meta1'] = 'value1'
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _test_container200_test(self, method, url, body, headers):
body = ''
if method != 'HEAD':
return (httplib.BAD_REQUEST,
body,
self.base_headers,
httplib.responses[httplib.BAD_REQUEST])
headers = {}
headers['etag'] = '0x8CFB877BB56A6FB'
headers['last-modified'] = 'Fri, 04 Jan 2013 09:48:06 GMT'
headers['content-length'] = '12345'
headers['content-type'] = 'application/zip'
headers['x-ms-blob-type'] = 'Block'
headers['x-ms-lease-status'] = 'unlocked'
headers['x-ms-lease-state'] = 'available'
headers['x-ms-meta-rabbits'] = 'monkeys'
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _test2_test_list_containers(self, method, url, body, headers):
# test_get_object
body = self.fixtures.load('list_containers.xml')
headers = {'content-type': 'application/zip',
'etag': '"e31208wqsdoj329jd"',
'x-amz-meta-rabbits': 'monkeys',
'content-length': '12345',
'last-modified': 'Thu, 13 Sep 2012 07:13:22 GMT'
}
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _new_container_ALREADY_EXISTS(self, method, url, body, headers):
# test_create_container
return (httplib.CONFLICT,
body,
headers,
httplib.responses[httplib.CONFLICT])
def _new_container(self, method, url, body, headers):
# test_create_container, test_delete_container
headers = {}
if method == 'PUT':
status = httplib.CREATED
headers['etag'] = '0x8CFB877BB56A6FB'
headers['last-modified'] = 'Fri, 04 Jan 2013 09:48:06 GMT'
headers['x-ms-lease-status'] = 'unlocked'
headers['x-ms-lease-state'] = 'available'
headers['x-ms-meta-meta1'] = 'value1'
elif method == 'DELETE':
status = httplib.NO_CONTENT
return (status,
body,
headers,
httplib.responses[status])
def _new_container_DOESNT_EXIST(self, method, url, body, headers):
# test_delete_container
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.NOT_FOUND])
def _foo_bar_container_NOT_FOUND(self, method, url, body, headers):
# test_delete_container_not_found
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.NOT_FOUND])
def _foo_bar_container_foo_bar_object_NOT_FOUND(self, method, url, body,
headers):
# test_delete_object_not_found
return (httplib.NOT_FOUND,
body,
headers,
httplib.responses[httplib.NOT_FOUND])
def _foo_bar_container_foo_bar_object_DELETE(self, method, url, body, headers):
# test_delete_object
return (httplib.ACCEPTED,
body,
headers,
httplib.responses[httplib.ACCEPTED])
def _foo_bar_container_foo_test_upload(self, method, url, body, headers):
self._assert_content_length_header_is_string(headers=headers)
query_string = urlparse.urlsplit(url).query
query = parse_qs(query_string)
comp = query.get('comp', [])
headers = {}
body = ''
if 'blocklist' in comp or not comp:
headers['etag'] = '"0x8CFB877BB56A6FB"'
headers['content-md5'] = 'd4fe4c9829f7ca1cc89db7ad670d2bbd'
elif 'block' in comp:
headers['content-md5'] = 'lvcfx/bOJvndpRlrdKU1YQ=='
else:
raise NotImplementedError('Unknown request comp: {}'.format(comp))
return (httplib.CREATED,
body,
headers,
httplib.responses[httplib.CREATED])
def _foo_bar_container_foo_test_upload_block(self, method, url,
body, headers):
# test_upload_object_success
self._assert_content_length_header_is_string(headers=headers)
body = ''
headers = {}
headers['etag'] = '0x8CFB877BB56A6FB'
return (httplib.CREATED,
body,
headers,
httplib.responses[httplib.CREATED])
def _foo_bar_container_foo_test_upload_blocklist(self, method, url,
body, headers):
# test_upload_object_success
self._assert_content_length_header_is_string(headers=headers)
body = ''
headers = {}
headers['etag'] = '0x8CFB877BB56A6FB'
headers['content-md5'] = 'd4fe4c9829f7ca1cc89db7ad670d2bbd'
return (httplib.CREATED,
body,
headers,
httplib.responses[httplib.CREATED])
def _foo_bar_container_foo_test_upload_lease(self, method, url,
body, headers):
# test_upload_object_success
self._assert_content_length_header_is_string(headers=headers)
action = headers['x-ms-lease-action']
rheaders = {'x-ms-lease-id': 'someleaseid'}
body = ''
if action == 'acquire':
return (httplib.CREATED,
body,
rheaders,
httplib.responses[httplib.CREATED])
else:
if headers.get('x-ms-lease-id', None) != 'someleaseid':
return (httplib.BAD_REQUEST,
body,
rheaders,
httplib.responses[httplib.BAD_REQUEST])
return (httplib.OK,
body,
headers,
httplib.responses[httplib.CREATED])
def _foo_bar_container_foo_test_upload_INVALID_HASH(self, method, url,
body, headers):
# test_upload_object_invalid_hash1
self._assert_content_length_header_is_string(headers=headers)
body = ''
headers = {}
headers['etag'] = '0x8CFB877BB56A6FB'
headers['content-md5'] = 'd4fe4c9829f7ca1cc89db7ad670d2bbd'
return (httplib.CREATED,
body,
headers,
httplib.responses[httplib.CREATED])
def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
# test_upload_object_invalid_file_size
self._assert_content_length_header_is_string(headers=headers)
body = generate_random_data(1000)
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_range(self, method, url, body, headers):
# test_download_object_range_success
body = '0123456789123456789'
self.assertTrue('x-ms-range' in headers)
self.assertEqual(headers['x-ms-range'], 'bytes=5-6')
start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(headers['x-ms-range'], body)
return (httplib.PARTIAL_CONTENT,
body[start_bytes:end_bytes + 1],
headers,
httplib.responses[httplib.PARTIAL_CONTENT])
def _foo_bar_container_foo_bar_object_range_stream(self, method, url, body, headers):
# test_download_object_range_as_stream_success
body = '0123456789123456789'
self.assertTrue('x-ms-range' in headers)
self.assertEqual(headers['x-ms-range'], 'bytes=4-5')
start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(headers['x-ms-range'], body)
return (httplib.PARTIAL_CONTENT,
body[start_bytes:end_bytes + 1],
headers,
httplib.responses[httplib.PARTIAL_CONTENT])
def _foo_bar_container_foo_bar_object_INVALID_SIZE(self, method, url,
body, headers):
# test_upload_object_invalid_file_size
self._assert_content_length_header_is_string(headers=headers)
body = ''
return (httplib.OK,
body,
headers,
httplib.responses[httplib.OK])
def _assert_content_length_header_is_string(self, headers):
if 'Content-Length' in headers:
self.assertTrue(isinstance(headers['Content-Length'], basestring))
class AzuriteBlobsMockHttp(AzureBlobsMockHttp):
fixtures = StorageFileFixtures('azurite_blobs')
def _get_method_name(self, *args, **kwargs):
method_name = super(AzuriteBlobsMockHttp, self).\
_get_method_name(*args, **kwargs)
if method_name.startswith('_account'):
method_name = method_name[8:]
return method_name
class AzureBlobsTests(unittest.TestCase):
driver_type = AzureBlobsStorageDriver
driver_args = STORAGE_AZURE_BLOBS_PARAMS
mock_response_klass = AzureBlobsMockHttp
@classmethod
def create_driver(self):
return self.driver_type(*self.driver_args)
def setUp(self):
self.driver_type.connectionCls.conn_class = self.mock_response_klass
self.mock_response_klass.type = None
self.driver = self.create_driver()
def tearDown(self):
self._remove_test_file()
def _remove_test_file(self):
file_path = os.path.abspath(__file__) + '.temp'
try:
os.unlink(file_path)
except OSError:
pass
def test_invalid_credentials(self):
self.mock_response_klass.type = 'UNAUTHORIZED'
try:
self.driver.list_containers()
except InvalidCredsError as e:
self.assertEqual(True, isinstance(e, InvalidCredsError))
else:
self.fail('Exception was not thrown')
def test_list_containers_empty(self):
self.mock_response_klass.type = 'list_containers_EMPTY'
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
def test_list_containers_success(self):
self.mock_response_klass.type = 'list_containers'
AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
containers = self.driver.list_containers()
self.assertEqual(len(containers), 4)
self.assertTrue('last_modified' in containers[1].extra)
self.assertTrue('url' in containers[1].extra)
self.assertTrue('etag' in containers[1].extra)
self.assertTrue('lease' in containers[1].extra)
self.assertTrue('meta_data' in containers[1].extra)
def test_list_container_objects_empty(self):
self.mock_response_klass.type = 'EMPTY'
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
def test_list_container_objects_success(self):
self.mock_response_klass.type = None
AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 4)
obj = objects[1]
self.assertEqual(obj.name, 'object2.txt')
self.assertEqual(obj.hash, '0x8CFB90F1BA8CD8F')
self.assertEqual(obj.size, 1048576)
self.assertEqual(obj.container.name, 'test_container')
self.assertTrue('meta1' in obj.meta_data)
self.assertTrue('meta2' in obj.meta_data)
self.assertTrue('last_modified' in obj.extra)
self.assertTrue('content_type' in obj.extra)
self.assertTrue('content_encoding' in obj.extra)
self.assertTrue('content_language' in obj.extra)
def test_list_container_objects_with_prefix(self):
self.mock_response_klass.type = None
AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name='test_container', extra={},
driver=self.driver)
objects = self.driver.list_container_objects(container=container,
prefix='test_prefix')
self.assertEqual(len(objects), 4)
obj = objects[1]
self.assertEqual(obj.name, 'object2.txt')
self.assertEqual(obj.hash, '0x8CFB90F1BA8CD8F')
self.assertEqual(obj.size, 1048576)
self.assertEqual(obj.container.name, 'test_container')
self.assertTrue('meta1' in obj.meta_data)
self.assertTrue('meta2' in obj.meta_data)
self.assertTrue('last_modified' in obj.extra)
self.assertTrue('content_type' in obj.extra)
self.assertTrue('content_encoding' in obj.extra)
self.assertTrue('content_language' in obj.extra)
def test_get_container_doesnt_exist(self):
self.mock_response_klass.type = None
try:
self.driver.get_container(container_name='test_container100')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_container_success(self):
self.mock_response_klass.type = None
container = self.driver.get_container(
container_name='test_container200')
self.assertTrue(container.name, 'test_container200')
self.assertTrue(container.extra['etag'], '0x8CFB877BB56A6FB')
self.assertTrue(container.extra['last_modified'],
'Fri, 04 Jan 2013 09:48:06 GMT')
self.assertTrue(container.extra['lease']['status'], 'unlocked')
self.assertTrue(container.extra['lease']['state'], 'available')
self.assertTrue(container.extra['meta_data']['meta1'], 'value1')
def test_get_object_cdn_url(self):
obj = self.driver.get_object(container_name='test_container200',
object_name='test')
url = urlparse.urlparse(self.driver.get_object_cdn_url(obj))
query = urlparse.parse_qs(url.query)
self.assertEqual(len(query['sig']), 1)
self.assertGreater(len(query['sig'][0]), 0)
def test_get_object_container_doesnt_exist(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = None
try:
self.driver.get_object(container_name='test_container100',
object_name='test')
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_get_object_success(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = None
obj = self.driver.get_object(container_name='test_container200',
object_name='test')
self.assertEqual(obj.name, 'test')
self.assertEqual(obj.container.name, 'test_container200')
self.assertEqual(obj.size, 12345)
self.assertEqual(obj.hash, '0x8CFB877BB56A6FB')
self.assertEqual(obj.extra['last_modified'],
'Fri, 04 Jan 2013 09:48:06 GMT')
self.assertEqual(obj.extra['content_type'], 'application/zip')
self.assertEqual(obj.meta_data['rabbits'], 'monkeys')
def test_create_container_invalid_name(self):
# invalid container name
self.mock_response_klass.type = 'INVALID_NAME'
try:
self.driver.create_container(container_name='new--container')
except InvalidContainerNameError:
pass
else:
self.fail('Exception was not thrown')
def test_create_container_already_exists(self):
# container with this name already exists
self.mock_response_klass.type = 'ALREADY_EXISTS'
try:
self.driver.create_container(container_name='new-container')
except ContainerAlreadyExistsError:
pass
else:
self.fail('Exception was not thrown')
def test_create_container_success(self):
# success
self.mock_response_klass.type = None
name = 'new-container'
container = self.driver.create_container(container_name=name)
self.assertEqual(container.name, name)
def test_delete_container_doesnt_exist(self):
container = Container(name='new_container', extra=None,
driver=self.driver)
self.mock_response_klass.type = 'DOESNT_EXIST'
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_container_not_empty(self):
self.mock_response_klass.type = None
AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name='test_container', extra={},
driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerIsNotEmptyError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_container_success(self):
self.mock_response_klass.type = 'EMPTY'
AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name='test_container', extra={},
driver=self.driver)
self.assertTrue(self.driver.delete_container(container=container))
def test_delete_container_not_found(self):
self.mock_response_klass.type = 'NOT_FOUND'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail('Container does not exist but an exception was not' +
'thrown')
def test_download_object_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
destination_path = os.path.abspath(__file__) + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertTrue(result)
def test_download_object_invalid_file_size(self):
self.mock_response_klass.type = 'INVALID_SIZE'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
destination_path = os.path.abspath(__file__) + '.temp'
result = self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertFalse(result)
def test_download_object_invalid_file_already_exists(self):
self.mock_response_klass.type = 'INVALID_SIZE'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
destination_path = os.path.abspath(__file__)
try:
self.driver.download_object(obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
except LibcloudError:
pass
else:
self.fail('Exception was not thrown')
def test_download_object_as_stream_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
stream = self.driver.download_object_as_stream(obj=obj,
chunk_size=None)
consumed_stream = ''.join(chunk.decode('utf-8') for chunk in stream)
self.assertEqual(len(consumed_stream), obj.size)
def test_download_object_range_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object_range', size=1000, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
destination_path = os.path.abspath(__file__) + '.temp'
result = self.driver.download_object_range(obj=obj,
start_bytes=5,
end_bytes=7,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True)
self.assertTrue(result)
with open(destination_path, 'r') as fp:
content = fp.read()
self.assertEqual(content, '56')
def test_download_object_range_as_stream_success(self):
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object_range_stream', size=2, hash=None, extra={},
container=container, meta_data=None,
driver=self.driver_type)
stream = self.driver.download_object_range_as_stream(obj=obj,
start_bytes=4,
end_bytes=6,
chunk_size=None)
consumed_stream = ''.join(chunk.decode('utf-8') for chunk in stream)
self.assertEqual(consumed_stream, '45')
self.assertEqual(len(consumed_stream), obj.size)
def test_upload_object_invalid_md5(self):
# Invalid md5 is returned by azure
self.mock_response_klass.type = 'INVALID_HASH'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
file_path = os.path.abspath(__file__)
try:
self.driver.upload_object(file_path=file_path, container=container,
object_name=object_name,
verify_hash=True)
except ObjectHashMismatchError:
pass
else:
self.fail(
'Invalid hash was returned but an exception was not thrown')
def test_upload_small_block_object_success(self):
file_path = os.path.abspath(__file__)
file_size = os.stat(file_path).st_size
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
extra = {'meta_data': {'some-value': 'foobar'}}
obj = self.driver.upload_object(file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=False)
self.assertEqual(obj.name, 'foo_test_upload')
self.assertEqual(obj.size, file_size)
self.assertTrue('some-value' in obj.meta_data)
def test_upload_big_block_object_success(self):
_, file_path = tempfile.mkstemp(suffix='.jpg')
file_size = AZURE_UPLOAD_CHUNK_SIZE + 1
with open(file_path, 'w') as file_hdl:
file_hdl.write('0' * file_size)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
extra = {'meta_data': {'some-value': 'foobar'}}
obj = self.driver.upload_object(file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=False)
self.assertEqual(obj.name, 'foo_test_upload')
self.assertEqual(obj.size, file_size)
self.assertTrue('some-value' in obj.meta_data)
os.remove(file_path)
def test_upload_small_block_object_success_with_lease(self):
self.mock_response_klass.use_param = 'comp'
file_path = os.path.abspath(__file__)
file_size = os.stat(file_path).st_size
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
extra = {'meta_data': {'some-value': 'foobar'}}
obj = self.driver.upload_object(file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=False,
ex_use_lease=True)
self.assertEqual(obj.name, 'foo_test_upload')
self.assertEqual(obj.size, file_size)
self.assertTrue('some-value' in obj.meta_data)
self.mock_response_klass.use_param = None
def test_upload_big_block_object_success_with_lease(self):
self.mock_response_klass.use_param = 'comp'
_, file_path = tempfile.mkstemp(suffix='.jpg')
file_size = AZURE_UPLOAD_CHUNK_SIZE * 2
with open(file_path, 'w') as file_hdl:
file_hdl.write('0' * file_size)
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
extra = {'meta_data': {'some-value': 'foobar'}}
obj = self.driver.upload_object(file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=False,
ex_use_lease=False)
self.assertEqual(obj.name, 'foo_test_upload')
self.assertEqual(obj.size, file_size)
self.assertTrue('some-value' in obj.meta_data)
os.remove(file_path)
self.mock_response_klass.use_param = None
def test_upload_blob_object_via_stream(self):
self.mock_response_klass.use_param = 'comp'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
iterator = BytesIO(b('345'))
extra = {'content_type': 'text/plain'}
obj = self.driver.upload_object_via_stream(container=container,
object_name=object_name,
iterator=iterator,
extra=extra)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
self.mock_response_klass.use_param = None
def test_upload_blob_object_via_stream_from_iterable(self):
self.mock_response_klass.use_param = 'comp'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
iterator = iter([b('34'), b('5')])
extra = {'content_type': 'text/plain'}
obj = self.driver.upload_object_via_stream(container=container,
object_name=object_name,
iterator=iterator,
extra=extra)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
self.mock_response_klass.use_param = None
def test_upload_blob_object_via_stream_with_lease(self):
self.mock_response_klass.use_param = 'comp'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
object_name = 'foo_test_upload'
iterator = BytesIO(b('345'))
extra = {'content_type': 'text/plain'}
obj = self.driver.upload_object_via_stream(container=container,
object_name=object_name,
iterator=iterator,
extra=extra,
ex_use_lease=True)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
self.mock_response_klass.use_param = None
def test_delete_object_not_found(self):
self.mock_response_klass.type = 'NOT_FOUND'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None,
meta_data=None, container=container, driver=self.driver)
try:
self.driver.delete_object(obj=obj)
except ObjectDoesNotExistError:
pass
else:
self.fail('Exception was not thrown')
def test_delete_object_success(self):
self.mock_response_klass.type = 'DELETE'
container = Container(name='foo_bar_container', extra={},
driver=self.driver)
obj = Object(name='foo_bar_object', size=1234, hash=None, extra=None,
meta_data=None, container=container, driver=self.driver)
result = self.driver.delete_object(obj=obj)
self.assertTrue(result)
def test_storage_driver_host(self):
# Non regression tests for issue LIBCLOUD-399 dealing with the bad
# management of the connectionCls.host class attribute
driver1 = self.driver_type('fakeaccount1', 'deadbeafcafebabe==')
driver2 = self.driver_type('fakeaccount2', 'deadbeafcafebabe==')
driver3 = self.driver_type('fakeaccount3', 'deadbeafcafebabe==',
host='test.foo.bar.com')
host1 = driver1.connection.host
host2 = driver2.connection.host
host3 = driver3.connection.host
self.assertEqual(host1, 'fakeaccount1.blob.core.windows.net')
self.assertEqual(host2, 'fakeaccount2.blob.core.windows.net')
self.assertEqual(host3, 'test.foo.bar.com')
def test_storage_driver_host_govcloud(self):
driver1 = self.driver_type(
'fakeaccount1', 'deadbeafcafebabe==',
host='blob.core.usgovcloudapi.net')
driver2 = self.driver_type(
'fakeaccount2', 'deadbeafcafebabe==',
host='fakeaccount2.blob.core.usgovcloudapi.net')
host1 = driver1.connection.host
host2 = driver2.connection.host
account_prefix_1 = driver1.connection.account_prefix
account_prefix_2 = driver2.connection.account_prefix
self.assertEqual(host1, 'fakeaccount1.blob.core.usgovcloudapi.net')
self.assertEqual(host2, 'fakeaccount2.blob.core.usgovcloudapi.net')
self.assertIsNone(account_prefix_1)
self.assertIsNone(account_prefix_2)
def test_storage_driver_host_azurite(self):
driver = self.driver_type(
'fakeaccount1', 'deadbeafcafebabe==',
host='localhost', port=10000, secure=False)
host = driver.connection.host
account_prefix = driver.connection.account_prefix
self.assertEqual(host, 'localhost')
self.assertEqual(account_prefix, 'fakeaccount1')
class AzuriteBlobsTests(AzureBlobsTests):
driver_args = STORAGE_AZURITE_BLOBS_PARAMS
mock_response_klass = AzuriteBlobsMockHttp
if __name__ == '__main__':
sys.exit(unittest.main())