blob: 24c741af8b730c654c7f39ddcdf0a8d51ebf3ee8 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import errno
import hashlib
from libcloud.utils.py3 import httplib
from io import BytesIO
import mock
from mock import Mock
from libcloud.utils.py3 import StringIO
from libcloud.utils.py3 import b
from libcloud.utils.py3 import PY2
from libcloud.utils.py3 import assertRaisesRegex
from libcloud.storage.base import StorageDriver
from libcloud.storage.base import DEFAULT_CONTENT_TYPE
from libcloud.test import unittest
from libcloud.test import MockHttp
from libcloud.test import BodyStream
class BaseMockRawResponse(MockHttp):
def _(self, method, url, body, headers):
body = 'ab'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def root(self, method, url, body, headers):
body = 'ab'
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
class BaseStorageTests(unittest.TestCase):
def setUp(self):
self.send_called = 0
StorageDriver.connectionCls.conn_class = BaseMockRawResponse
self.driver1 = StorageDriver('username', 'key', host='localhost')
self.driver1.supports_chunked_encoding = True
self.driver2 = StorageDriver('username', 'key', host='localhost')
self.driver2.supports_chunked_encoding = False
self.driver1.strict_mode = False
self.driver1.strict_mode = False
def test__upload_object_iterator_must_have_next_method(self):
valid_iterators = [BytesIO(b('134')), StringIO('bar')]
invalid_iterators = ['foobar', '', False, True, 1, object()]
def upload_func(*args, **kwargs):
return True, 'barfoo', 100
kwargs = {'object_name': 'foo', 'content_type': 'foo/bar',
'upload_func': upload_func, 'upload_func_kwargs': {},
'request_path': '/', 'headers': {}}
for value in valid_iterators:
kwargs['stream'] = value
self.driver1._upload_object(**kwargs)
for value in invalid_iterators:
kwargs['stream'] = value
try:
self.driver1._upload_object(**kwargs)
except AttributeError:
pass
else:
self.fail('Exception was not thrown')
def test__upload_object_does_not_stream_response(self):
resp = self.driver1._upload_object(
object_name='foo',
content_type='foo/bar',
request_path='/',
stream=iter(b'foo'))
mock_response = resp["response"].response._response
response_streamed = mock_response.request.stream
assert response_streamed is False
def test__get_hash_function(self):
self.driver1.hash_type = 'md5'
func = self.driver1._get_hash_function()
self.assertTrue(func)
self.driver1.hash_type = 'sha1'
func = self.driver1._get_hash_function()
self.assertTrue(func)
try:
self.driver1.hash_type = 'invalid-hash-function'
func = self.driver1._get_hash_function()
except RuntimeError:
pass
else:
self.fail('Invalid hash type but exception was not thrown')
def test_upload_no_content_type_supplied_or_detected(self):
iterator = StringIO()
upload_func = Mock()
upload_func.return_value = True, '', 0
# strict_mode is disabled, default content type should be used
self.driver1.connection = Mock()
self.driver1._upload_object(object_name='test',
content_type=None,
upload_func=upload_func,
upload_func_kwargs={},
request_path='/',
stream=iterator)
headers = self.driver1.connection.request.call_args[-1]['headers']
self.assertEqual(headers['Content-Type'], DEFAULT_CONTENT_TYPE)
# strict_mode is enabled, exception should be thrown
self.driver1.strict_mode = True
expected_msg = ('File content-type could not be guessed and no'
' content_type value is provided')
assertRaisesRegex(self, AttributeError, expected_msg,
self.driver1._upload_object,
object_name='test',
content_type=None,
upload_func=upload_func,
upload_func_kwargs={},
request_path='/',
stream=iterator)
@mock.patch('libcloud.utils.files.exhaust_iterator')
@mock.patch('libcloud.utils.files.read_in_chunks')
def test_upload_object_hash_calculation_is_efficient(self, mock_read_in_chunks,
mock_exhaust_iterator):
# Verify that we don't buffer whole file in memory when calculating
# object has when iterator has __next__ method, but instead read and
# calculate hash in chunks
size = 100
self.driver1.connection = Mock()
# stream has __next__ method and next() method
mock_read_in_chunks.return_value = 'a' * size
iterator = BodyStream('a' * size)
self.assertTrue(hasattr(iterator, '__next__'))
self.assertTrue(hasattr(iterator, 'next'))
upload_func = Mock()
upload_func.return_value = True, '', size
self.assertEqual(mock_read_in_chunks.call_count, 0)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
result = self.driver1._upload_object(object_name='test1',
content_type=None,
upload_func=upload_func,
upload_func_kwargs={},
request_path='/',
stream=iterator)
hasher = hashlib.md5()
hasher.update(b('a') * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result['data_hash'], expected_hash)
self.assertEqual(result['bytes_transferred'], size)
headers = self.driver1.connection.request.call_args[-1]['headers']
self.assertEqual(headers['Content-Type'], DEFAULT_CONTENT_TYPE)
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
# stream has only has next() method
mock_read_in_chunks.return_value = 'b' * size
iterator = iter([str(v) for v in ['b' * size]])
if PY2:
self.assertFalse(hasattr(iterator, '__next__'))
self.assertTrue(hasattr(iterator, 'next'))
else:
self.assertTrue(hasattr(iterator, '__next__'))
self.assertFalse(hasattr(iterator, 'next'))
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
result = self.driver1._upload_object(object_name='test2',
content_type=None,
upload_func=upload_func,
upload_func_kwargs={},
request_path='/',
stream=iterator)
hasher = hashlib.md5()
hasher.update(b('b') * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result['data_hash'], expected_hash)
self.assertEqual(result['bytes_transferred'], size)
headers = self.driver1.connection.request.call_args[-1]['headers']
self.assertEqual(headers['Content-Type'], DEFAULT_CONTENT_TYPE)
self.assertEqual(mock_read_in_chunks.call_count, 2)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
def test_upload_object_via_stream_illegal_seek_errors_are_ignored(self):
# Illegal seek errors should be ignored
size = 100
self.driver1.connection = Mock()
seek_error = OSError('Illegal seek')
seek_error.errno = 29
assert errno.ESPIPE == 29
iterator = BodyStream('a' * size)
iterator.seek = mock.Mock(side_effect=seek_error)
result = self.driver1._upload_object(object_name='test1',
content_type=None,
request_path='/',
stream=iterator)
hasher = hashlib.md5()
hasher.update(b('a') * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result['data_hash'], expected_hash)
self.assertEqual(result['bytes_transferred'], size)
# But others shouldn't
self.driver1.connection = Mock()
seek_error = OSError('Other error')
seek_error.errno = 21
iterator = BodyStream('b' * size)
iterator.seek = mock.Mock(side_effect=seek_error)
assertRaisesRegex(self, OSError, 'Other error',
self.driver1._upload_object,
object_name='test1',
content_type=None,
request_path='/',
stream=iterator)
if __name__ == '__main__':
sys.exit(unittest.main())