blob: 41db0c41ff7c5f4e3b4ed6a8cbce46166d2d2203 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
import errno
import hashlib
from io import BytesIO
from unittest import mock
from unittest.mock import Mock
from libcloud.test import MockHttp, BodyStream, unittest
from libcloud.utils.py3 import PY2, StringIO, b, httplib, assertRaisesRegex
from libcloud.storage.base import DEFAULT_CONTENT_TYPE, StorageDriver
from libcloud.common.exceptions import RateLimitReachedError
from libcloud.test.storage.base import BaseRangeDownloadMockHttp
class BaseMockRawResponse(MockHttp):
def _(self, method, url, body, headers):
body = "ab"
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
def root(self, method, url, body, headers):
body = "ab"
return (httplib.OK, body, {}, httplib.responses[httplib.OK])
class BaseStorageTests(unittest.TestCase):
def setUp(self):
self.send_called = 0
StorageDriver.connectionCls.conn_class = BaseMockRawResponse
self.driver1 = StorageDriver("username", "key", host="localhost")
self.driver1.supports_chunked_encoding = True
self.driver2 = StorageDriver("username", "key", host="localhost")
self.driver2.supports_chunked_encoding = False
self.driver1.strict_mode = False
self.driver1.strict_mode = False
def test__upload_object_iterator_must_have_next_method(self):
valid_iterators = [BytesIO(b("134")), StringIO("bar")]
invalid_iterators = ["foobar", "", False, True, 1, object()]
kwargs = {
"object_name": "foo",
"content_type": "foo/bar",
"request_path": "/",
"headers": {},
}
for value in valid_iterators:
kwargs["stream"] = value
self.driver1._upload_object(**kwargs)
for value in invalid_iterators:
kwargs["stream"] = value
try:
self.driver1._upload_object(**kwargs)
except AttributeError:
pass
else:
self.fail("Exception was not thrown")
def test__upload_object_does_not_stream_response(self):
resp = self.driver1._upload_object(
object_name="foo",
content_type="foo/bar",
request_path="/",
stream=iter(b"foo"),
)
mock_response = resp["response"].response._response
response_streamed = mock_response.request.stream
assert response_streamed is False
def test__get_hash_function(self):
self.driver1.hash_type = "md5"
func = self.driver1._get_hash_function()
self.assertTrue(func)
self.driver1.hash_type = "sha1"
func = self.driver1._get_hash_function()
self.assertTrue(func)
try:
self.driver1.hash_type = "invalid-hash-function"
func = self.driver1._get_hash_function()
except RuntimeError:
pass
else:
self.fail("Invalid hash type but exception was not thrown")
def test_upload_no_content_type_supplied_or_detected(self):
iterator = StringIO()
# strict_mode is disabled, default content type should be used
self.driver1.connection = Mock()
self.driver1._upload_object(
object_name="test", content_type=None, request_path="/", stream=iterator
)
headers = self.driver1.connection.request.call_args[-1]["headers"]
self.assertEqual(headers["Content-Type"], DEFAULT_CONTENT_TYPE)
# strict_mode is enabled, exception should be thrown
self.driver1.strict_mode = True
expected_msg = (
'File content-type could not be guessed for "test" '
"and no content_type value is provided"
)
assertRaisesRegex(
self,
AttributeError,
expected_msg,
self.driver1._upload_object,
object_name="test",
content_type=None,
request_path="/",
stream=iterator,
)
@mock.patch("libcloud.utils.files.exhaust_iterator")
@mock.patch("libcloud.utils.files.read_in_chunks")
def test_upload_object_hash_calculation_is_efficient(
self, mock_read_in_chunks, mock_exhaust_iterator
):
# Verify that we don't buffer whole file in memory when calculating
# object has when iterator has __next__ method, but instead read and
# calculate hash in chunks
size = 100
self.driver1.connection = Mock()
# stream has __next__ method and next() method
mock_read_in_chunks.return_value = "a" * size
iterator = BodyStream("a" * size)
self.assertTrue(hasattr(iterator, "__next__"))
self.assertTrue(hasattr(iterator, "next"))
self.assertEqual(mock_read_in_chunks.call_count, 0)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
result = self.driver1._upload_object(
object_name="test1", content_type=None, request_path="/", stream=iterator
)
hasher = hashlib.md5()
hasher.update(b("a") * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result["data_hash"], expected_hash)
self.assertEqual(result["bytes_transferred"], size)
headers = self.driver1.connection.request.call_args[-1]["headers"]
self.assertEqual(headers["Content-Type"], DEFAULT_CONTENT_TYPE)
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
# stream has only has next() method
mock_read_in_chunks.return_value = "b" * size
iterator = iter([str(v) for v in ["b" * size]])
if PY2:
self.assertFalse(hasattr(iterator, "__next__"))
self.assertTrue(hasattr(iterator, "next"))
else:
self.assertTrue(hasattr(iterator, "__next__"))
self.assertFalse(hasattr(iterator, "next"))
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
self.assertEqual(mock_read_in_chunks.call_count, 1)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
result = self.driver1._upload_object(
object_name="test2", content_type=None, request_path="/", stream=iterator
)
hasher = hashlib.md5()
hasher.update(b("b") * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result["data_hash"], expected_hash)
self.assertEqual(result["bytes_transferred"], size)
headers = self.driver1.connection.request.call_args[-1]["headers"]
self.assertEqual(headers["Content-Type"], DEFAULT_CONTENT_TYPE)
self.assertEqual(mock_read_in_chunks.call_count, 2)
self.assertEqual(mock_exhaust_iterator.call_count, 0)
def test_upload_object_via_stream_illegal_seek_errors_are_ignored(self):
# Illegal seek errors should be ignored
size = 100
self.driver1.connection = Mock()
seek_error = OSError("Illegal seek")
seek_error.errno = 29
assert errno.ESPIPE == 29
iterator = BodyStream("a" * size)
iterator.seek = mock.Mock(side_effect=seek_error)
result = self.driver1._upload_object(
object_name="test1", content_type=None, request_path="/", stream=iterator
)
hasher = hashlib.md5()
hasher.update(b("a") * size)
expected_hash = hasher.hexdigest()
self.assertEqual(result["data_hash"], expected_hash)
self.assertEqual(result["bytes_transferred"], size)
# But others shouldn't
self.driver1.connection = Mock()
seek_error = OSError("Other error")
seek_error.errno = 21
iterator = BodyStream("b" * size)
iterator.seek = mock.Mock(side_effect=seek_error)
self.assertRaisesRegex(
OSError,
"Other error",
self.driver1._upload_object,
object_name="test1",
content_type=None,
request_path="/",
stream=iterator,
)
def test_get_standard_range_str(self):
result = self.driver1._get_standard_range_str(0, 5)
self.assertEqual(result, "bytes=0-4")
result = self.driver1._get_standard_range_str(0)
self.assertEqual(result, "bytes=0-")
result = self.driver1._get_standard_range_str(0, 1)
self.assertEqual(result, "bytes=0-0")
result = self.driver1._get_standard_range_str(200)
self.assertEqual(result, "bytes=200-")
result = self.driver1._get_standard_range_str(10, 200)
self.assertEqual(result, "bytes=10-199")
result = self.driver1._get_standard_range_str(10, 11)
self.assertEqual(result, "bytes=10-10")
result = self.driver1._get_standard_range_str(10, 11, True)
self.assertEqual(result, "bytes=10-11")
@mock.patch("os.environ", {"LIBCLOUD_RETRY_FAILED_HTTP_REQUESTS": True})
def test_should_retry_rate_limited_errors(self):
class SecondException(Exception):
pass
count = 0
def raise_on_second(*_, **__):
nonlocal count
count += 1
if count > 1:
raise SecondException()
else:
raise RateLimitReachedError()
send_mock = Mock()
self.driver1.connection.connection.session.send = send_mock
send_mock.side_effect = raise_on_second
with self.assertRaises(SecondException):
self.driver1._upload_object(
object_name="some name",
content_type="something",
request_path="/",
stream=iter([]),
)
@mock.patch("os.environ", {"LIBCLOUD_RETRY_FAILED_HTTP_REQUESTS": True})
def test_should_retry_rate_limited_errors_until_success(self):
count = 0
def succeed_on_second(*_, **__) -> mock.MagicMock:
nonlocal count
count += 1
if count > 1:
successful_response = mock.MagicMock()
successful_response.status_code = 200
return successful_response
else:
raise RateLimitReachedError()
self.driver1.connection.connection.session.send = Mock(side_effect=succeed_on_second)
uploaded_object = self.driver1._upload_object(
object_name="some name",
content_type="something",
request_path="/",
stream=iter([]),
)
self.assertEqual(
True,
uploaded_object["response"].success(),
"Expected to have successful response after retry",
)
class BaseRangeDownloadMockHttpTestCase(unittest.TestCase):
def test_get_start_and_end_bytes_from_range_str(self):
mock_http = BaseRangeDownloadMockHttp(None, None)
body = "0123456789"
range_str = "bytes=1-"
result = mock_http._get_start_and_end_bytes_from_range_str(range_str, body)
self.assertEqual(result, (1, len(body)))
range_str = "bytes=1-5"
result = mock_http._get_start_and_end_bytes_from_range_str(range_str, body)
self.assertEqual(result, (1, 5))
range_str = "bytes=3-5"
result = mock_http._get_start_and_end_bytes_from_range_str(range_str, body)
self.assertEqual(result, (3, 5))
if __name__ == "__main__":
sys.exit(unittest.main())