blob: e5ea383e0c8c36cc56891baa6438876309bcd910 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import hmac
import base64
import tempfile
from io import BytesIO
from hashlib import sha1
from unittest import mock
from unittest.mock import Mock, PropertyMock
import libcloud.utils.files # NOQA: F401
from libcloud.test import MockHttp # pylint: disable-msg=E0611 # noqa
from libcloud.test import unittest, make_response, generate_random_data
from libcloud.utils.py3 import ET, StringIO, b, httplib, parse_qs, urlparse
from libcloud.utils.files import exhaust_iterator
from libcloud.common.types import LibcloudError, InvalidCredsError, MalformedResponseError
from libcloud.storage.base import Object, Container
from libcloud.test.secrets import STORAGE_S3_PARAMS
from libcloud.storage.types import (
ContainerError,
ObjectDoesNotExistError,
ObjectHashMismatchError,
ContainerIsNotEmptyError,
InvalidContainerNameError,
ContainerDoesNotExistError,
)
from libcloud.test.storage.base import BaseRangeDownloadMockHttp
from libcloud.storage.drivers.s3 import (
CHUNK_SIZE,
S3StorageDriver,
BaseS3Connection,
S3USWestStorageDriver,
S3SignatureV4Connection,
)
from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
class S3MockHttp(BaseRangeDownloadMockHttp, unittest.TestCase):
fixtures = StorageFileFixtures("s3")
base_headers = {}
def _UNAUTHORIZED(self, method, url, body, headers):
return (
httplib.UNAUTHORIZED,
"",
self.base_headers,
httplib.responses[httplib.OK],
)
def _DIFFERENT_REGION(self, method, url, body, headers):
return (
httplib.MOVED_PERMANENTLY,
"",
self.base_headers,
httplib.responses[httplib.OK],
)
def _list_containers_EMPTY(self, method, url, body, headers):
body = self.fixtures.load("list_containers_empty.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_containers_TOKEN(self, method, url, body, headers):
if "x-amz-security-token" in headers:
assert headers["x-amz-security-token"] == "asdf"
body = self.fixtures.load("list_containers_empty.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_containers(self, method, url, body, headers):
body = self.fixtures.load("list_containers.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _test_container_EMPTY(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects_empty.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _test_container(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _test_container_ITERATOR(self, method, url, body, headers):
if url.find("3.zip") == -1:
# First part of the response (first 3 objects)
file_name = "list_container_objects_not_exhausted1.xml"
else:
file_name = "list_container_objects_not_exhausted2.xml"
body = self.fixtures.load(file_name)
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _test2_get_object(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _test2_test_get_object_no_content_type(self, method, url, body, headers):
headers = {
"content-length": "12345",
"last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _test2_get_object_no_content_type(self, method, url, body, headers):
headers = {
"content-length": "12345",
"last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _test2_test_get_object(self, method, url, body, headers):
# test_get_object_success
body = self.fixtures.load("list_containers.xml")
headers = {
"content-type": "application/zip",
"etag": '"e31208wqsdoj329jd"',
"x-amz-meta-rabbits": "monkeys",
"content-length": "12345",
"last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _test2_get_object_no_content_length(self, method, url, body, headers):
# test_get_object_unable_to_determine_object_size
body = self.fixtures.load("list_containers.xml")
headers = {
"content-type": "application/zip",
"etag": '"e31208wqsdoj329jd"',
"x-amz-meta-rabbits": "monkeys",
"last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _test2_test_get_object_no_content_length(self, method, url, body, headers):
# test_get_object_unable_to_determine_object_size
body = self.fixtures.load("list_containers.xml")
headers = {
"content-type": "application/zip",
"etag": '"e31208wqsdoj329jd"',
"x-amz-meta-rabbits": "monkeys",
"last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _new_container_INVALID_NAME(self, method, url, body, headers):
# test_create_container
return (httplib.BAD_REQUEST, body, headers, httplib.responses[httplib.OK])
def _new_container_ALREADY_EXISTS(self, method, url, body, headers):
# test_create_container
return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK])
def _new_container(self, method, url, body, headers):
# test_create_container, test_delete_container
if method == "PUT":
status = httplib.OK
elif method == "DELETE":
status = httplib.NO_CONTENT
return (status, body, headers, httplib.responses[httplib.OK])
def _new_container_DOESNT_EXIST(self, method, url, body, headers):
# test_delete_container
return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK])
def _new_container_NOT_EMPTY(self, method, url, body, headers):
# test_delete_container
return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK])
def _test1_get_container(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _container1_get_container(self, method, url, body, headers):
return (
httplib.NOT_FOUND,
"",
self.base_headers,
httplib.responses[httplib.NOT_FOUND],
)
def _test_inexistent_get_object(self, method, url, body, headers):
return (
httplib.NOT_FOUND,
"",
self.base_headers,
httplib.responses[httplib.NOT_FOUND],
)
def _foo_bar_container(self, method, url, body, headers):
# test_delete_container
return (httplib.NO_CONTENT, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_NOT_FOUND(self, method, url, body, headers):
# test_delete_container_not_found
return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_NOT_FOUND(self, method, url, body, headers):
# test_delete_object_not_found
return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_DELETE(self, method, url, body, headers):
# test_delete_object
return (httplib.NO_CONTENT, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_stream_data(self, method, url, body, headers):
# test_upload_object_via_stream
body = ""
headers = {"etag": '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_stream_data_MULTIPART(self, method, url, body, headers):
if method == "POST":
if "uploadId" in url:
# Complete multipart request
body = self.fixtures.load("complete_multipart.xml")
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
else:
# Initiate multipart request
body = self.fixtures.load("initiate_multipart.xml")
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
elif method == "DELETE":
# Abort multipart request
return (
httplib.NO_CONTENT,
"",
headers,
httplib.responses[httplib.NO_CONTENT],
)
else:
# Upload chunk multipart request
headers = {"etag": '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK, "", headers, httplib.responses[httplib.OK])
def _foo_bar_container_LIST_MULTIPART(self, method, url, body, headers):
query_string = urlparse.urlsplit(url).query
query = parse_qs(query_string)
if "key-marker" not in query:
body = self.fixtures.load("list_multipart_1.xml")
else:
body = self.fixtures.load("list_multipart_2.xml")
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_my_divisor_LIST_MULTIPART(self, method, url, body, headers):
body = ""
return (
httplib.NO_CONTENT,
body,
headers,
httplib.responses[httplib.NO_CONTENT],
)
def _foo_bar_container_my_movie_m2ts_LIST_MULTIPART(self, method, url, body, headers):
body = ""
return (
httplib.NO_CONTENT,
body,
headers,
httplib.responses[httplib.NO_CONTENT],
)
def parse_body(self):
if len(self.body) == 0 and not self.parse_zero_length_body:
return self.body
try:
try:
body = ET.XML(self.body)
except ValueError:
# lxml wants a bytes and tests are basically hard-coded to str
body = ET.XML(self.body.encode("utf-8"))
except Exception:
raise MalformedResponseError(
"Failed to parse XML", body=self.body, driver=self.connection.driver
)
return body
def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
# test_download_object_success
body = generate_random_data(1000)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_range(self, method, url, body, headers):
# test_download_object_range_success
body = "0123456789123456789"
self.assertTrue("Range" in headers)
self.assertEqual(headers["Range"], "bytes=5-6")
start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(
headers["Range"], body
)
return (
httplib.PARTIAL_CONTENT,
body[start_bytes : end_bytes + 1],
headers,
httplib.responses[httplib.PARTIAL_CONTENT],
)
def _foo_bar_container_foo_bar_object_range_stream(self, method, url, body, headers):
# test_download_object_range_as_stream_success
body = "0123456789123456789"
self.assertTrue("Range" in headers)
self.assertEqual(headers["Range"], "bytes=4-6")
start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(
headers["Range"], body
)
return (
httplib.PARTIAL_CONTENT,
body[start_bytes : end_bytes + 1],
headers,
httplib.responses[httplib.PARTIAL_CONTENT],
)
def _foo_bar_container_foo_bar_object_NO_BUFFER(self, method, url, body, headers):
# test_download_object_data_is_not_buffered_in_memory
body = generate_random_data(1000)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_test_upload(self, method, url, body, headers):
# test_upload_object_success
body = ""
headers = {"etag": '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_container_foo_bar_object_INVALID_SIZE(self, method, url, body, headers):
# test_upload_object_invalid_file_size
body = ""
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
class S3Tests(unittest.TestCase):
driver_type = S3StorageDriver
driver_args = STORAGE_S3_PARAMS
mock_response_klass = S3MockHttp
@classmethod
def create_driver(self):
return self.driver_type(*self.driver_args)
def setUp(self):
self.driver_type.connectionCls.conn_class = self.mock_response_klass
self.mock_response_klass.type = None
self.driver = self.create_driver()
_, self._file_path = tempfile.mkstemp()
self._remove_test_file()
def tearDown(self):
self._remove_test_file()
def _remove_test_file(self):
try:
os.unlink(self._file_path)
except OSError:
pass
def test_clean_object_name(self):
# Ensure ~ is not URL encoded
# See https://github.com/apache/libcloud/issues/1452 for details
cleaned = self.driver._clean_object_name(name="valid")
self.assertEqual(cleaned, "valid")
cleaned = self.driver._clean_object_name(name="valid/~")
self.assertEqual(cleaned, "valid/~")
cleaned = self.driver._clean_object_name(name="valid/~%foo ")
self.assertEqual(cleaned, "valid/~%25foo%20")
def test_invalid_credentials(self):
self.mock_response_klass.type = "UNAUTHORIZED"
try:
self.driver.list_containers()
except InvalidCredsError as e:
self.assertEqual(True, isinstance(e, InvalidCredsError))
else:
self.fail("Exception was not thrown")
def test_token(self):
self.mock_response_klass.type = "list_containers_TOKEN"
self.driver = self.driver_type(*self.driver_args, token="asdf")
self.driver.list_containers()
def test_signature(self):
secret_key = "ssssh!"
sig = BaseS3Connection.get_auth_signature(
method="GET",
headers={"foo": "bar", "content-type": "TYPE!", "x-aws-test": "test_value"},
params={"hello": "world"},
expires=None,
secret_key=secret_key,
path="/",
vendor_prefix="x-aws",
)
string_to_sign = "GET\n\nTYPE!\n\nx-aws-test:test_value\n/"
b64_hmac = base64.b64encode(
hmac.new(b(secret_key), b(string_to_sign), digestmod=sha1).digest()
)
expected_sig = b64_hmac.decode("utf-8")
self.assertEqual(sig, expected_sig)
def test_bucket_is_located_in_different_region(self):
self.mock_response_klass.type = "DIFFERENT_REGION"
try:
self.driver.list_containers()
except LibcloudError:
pass
else:
self.fail("Exception was not thrown")
def test_list_containers_empty(self):
self.mock_response_klass.type = "list_containers_EMPTY"
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
def test_list_containers_success(self):
self.mock_response_klass.type = "list_containers"
containers = self.driver.list_containers()
self.assertEqual(len(containers), 2)
self.assertTrue("creation_date" in containers[1].extra)
def test_list_container_objects_empty(self):
self.mock_response_klass.type = "EMPTY"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
def test_list_container_objects_success(self):
self.mock_response_klass.type = None
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 1)
obj = [o for o in objects if o.name == "1.zip"][0]
self.assertEqual(obj.hash, "4397da7a7649e8085de9916c240e8166")
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, "test_container")
self.assertEqual(obj.extra["last_modified"], "2011-04-09T19:05:18.000Z")
self.assertTrue("owner" in obj.meta_data)
def test_list_container_objects_iterator_has_more(self):
self.mock_response_klass.type = "ITERATOR"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
obj = [o for o in objects if o.name == "1.zip"][0]
self.assertEqual(obj.hash, "4397da7a7649e8085de9916c240e8166")
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, "test_container")
self.assertTrue(obj in objects)
self.assertEqual(len(objects), 5)
def test_list_container_objects_with_prefix(self):
self.mock_response_klass.type = None
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container, prefix="test_prefix")
self.assertEqual(len(objects), 1)
obj = [o for o in objects if o.name == "1.zip"][0]
self.assertEqual(obj.hash, "4397da7a7649e8085de9916c240e8166")
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, "test_container")
self.assertTrue("owner" in obj.meta_data)
def test_get_container_doesnt_exist(self):
self.mock_response_klass.type = "get_container"
try:
self.driver.get_container(container_name="container1")
except ContainerDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_get_container_success(self):
self.mock_response_klass.type = "get_container"
container = self.driver.get_container(container_name="test1")
self.assertTrue(container.name, "test1")
def test_get_object_no_content_type_and_etag_in_response_headers(self):
self.mock_response_klass.type = "get_object_no_content_type"
obj = self.driver.get_object(container_name="test2", object_name="test")
self.assertEqual(obj.name, "test")
self.assertEqual(obj.container.name, "test2")
self.assertEqual(obj.size, 12345)
self.assertIsNone(obj.hash)
self.assertEqual(obj.extra["last_modified"], "Thu, 13 Sep 2012 07:13:22 GMT")
self.assertTrue("etag" not in obj.extra)
self.assertTrue("content_type" not in obj.extra)
def test_get_object_cdn_url(self):
self.mock_response_klass.type = "get_object"
obj = self.driver.get_object(container_name="test2", object_name="test")
# cdn urls can only be generated using a V4 connection
if issubclass(self.driver.connectionCls, S3SignatureV4Connection):
cdn_url = self.driver.get_object_cdn_url(obj, ex_expiry=12)
url = urlparse.urlparse(cdn_url)
query = urlparse.parse_qs(url.query)
self.assertEqual(len(query["X-Amz-Signature"]), 1)
self.assertGreater(len(query["X-Amz-Signature"][0]), 0)
self.assertEqual(query["X-Amz-Expires"], ["43200"])
else:
with self.assertRaises(NotImplementedError):
self.driver.get_object_cdn_url(obj)
def test_get_object_container_doesnt_exist(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = "get_object"
try:
self.driver.get_object(container_name="test-inexistent", object_name="test")
except ContainerDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_get_object_success(self):
# This method makes two requests which makes mocking the response a bit
# trickier
self.mock_response_klass.type = "get_object"
obj = self.driver.get_object(container_name="test2", object_name="test")
self.assertEqual(obj.name, "test")
self.assertEqual(obj.container.name, "test2")
self.assertEqual(obj.size, 12345)
self.assertEqual(obj.hash, "e31208wqsdoj329jd")
self.assertEqual(obj.extra["last_modified"], "Thu, 13 Sep 2012 07:13:22 GMT")
self.assertEqual(obj.extra["content_type"], "application/zip")
self.assertEqual(obj.meta_data["rabbits"], "monkeys")
def test_get_object_unable_to_determine_object_size(self):
self.mock_response_klass.type = "get_object_no_content_length"
expected_msg = "Can not deduce object size from headers"
self.assertRaisesRegex(
KeyError,
expected_msg,
self.driver.get_object,
container_name="test2",
object_name="test",
)
def test_create_container_bad_request(self):
# invalid container name, returns a 400 bad request
self.mock_response_klass.type = "INVALID_NAME"
try:
self.driver.create_container(container_name="new_container")
except ContainerError:
pass
else:
self.fail("Exception was not thrown")
def test_create_container_already_exists(self):
# container with this name already exists
self.mock_response_klass.type = "ALREADY_EXISTS"
try:
self.driver.create_container(container_name="new-container")
except InvalidContainerNameError:
pass
else:
self.fail("Exception was not thrown")
def test_create_container_success(self):
# success
self.mock_response_klass.type = None
name = "new_container"
container = self.driver.create_container(container_name=name)
self.assertEqual(container.name, name)
def test_delete_container_doesnt_exist(self):
container = Container(name="new_container", extra=None, driver=self.driver)
self.mock_response_klass.type = "DOESNT_EXIST"
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_delete_container_not_empty(self):
container = Container(name="new_container", extra=None, driver=self.driver)
self.mock_response_klass.type = "NOT_EMPTY"
try:
self.driver.delete_container(container=container)
except ContainerIsNotEmptyError:
pass
else:
self.fail("Exception was not thrown")
# success
self.mock_response_klass.type = None
self.assertTrue(self.driver.delete_container(container=container))
def test_delete_container_not_found(self):
self.mock_response_klass.type = "NOT_FOUND"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail("Container does not exist but an exception was not" + "thrown")
def test_delete_container_success(self):
self.mock_response_klass.type = None
container = Container(name="new_container", extra=None, driver=self.driver)
self.assertTrue(self.driver.delete_container(container=container))
def test_download_object_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = self._file_path
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=True,
delete_on_failure=True,
)
self.assertTrue(result)
def test_download_object_range_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object_range",
size=19,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = self._file_path
result = self.driver.download_object_range(
obj=obj,
destination_path=destination_path,
start_bytes=5,
end_bytes=7,
overwrite_existing=True,
delete_on_failure=True,
)
self.assertTrue(result)
with open(self._file_path) as fp:
content = fp.read()
self.assertEqual(content, "56")
def test_download_object_range_as_stream_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object_range_stream",
size=19,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
iterator = self.driver.download_object_range_as_stream(obj=obj, start_bytes=4, end_bytes=7)
content = exhaust_iterator(iterator)
self.assertEqual(content, b"456")
def test_download_object_data_is_not_buffered_in_memory(self):
# Test case which verifies that response.body attribute is not accessed
# and as such, whole body response is not buffered into RAM
# If content is consumed and response.content attribute accessed execption
# will be thrown and test will fail
mock_response = Mock(name="mock response")
mock_response.headers = {}
mock_response.status_code = 200
msg = '"content" attribute was accessed but it shouldn\'t have been'
type(mock_response).content = PropertyMock(
name="mock content attribute", side_effect=Exception(msg)
)
mock_response.iter_content.return_value = StringIO("a" * 1000)
self.driver.connection.connection.getresponse = Mock()
self.driver.connection.connection.getresponse.return_value = mock_response
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object_NO_BUFFER",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = self._file_path
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertTrue(result)
def test_download_object_as_stream_data_is_not_buffered_in_memory(self):
# Test case which verifies that response.response attribute is not accessed
# and as such, whole body response is not buffered into RAM
# If content is consumed and response.content attribute accessed exception
# will be thrown and test will fail
mock_response = Mock(name="mock response")
mock_response.headers = {}
mock_response.status = 200
msg1 = '"response" attribute was accessed but it shouldn\'t have been'
msg2 = '"content" attribute was accessed but it shouldn\'t have been'
type(mock_response).response = PropertyMock(
name="mock response attribute", side_effect=Exception(msg1)
)
type(mock_response).content = PropertyMock(
name="mock content attribute", side_effect=Exception(msg2)
)
mock_response.iter_content.return_value = StringIO("a" * 1000)
self.driver.connection.request = Mock()
self.driver.connection.request.return_value = mock_response
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object_NO_BUFFER",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
result = self.driver.download_object_as_stream(obj=obj)
result = exhaust_iterator(result)
result = result.decode("utf-8")
self.assertEqual(result, "a" * 1000)
def test_download_object_invalid_file_size(self):
self.mock_response_klass.type = "INVALID_SIZE"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = self._file_path
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertFalse(result)
def test_download_object_invalid_file_already_exists(self):
self.mock_response_klass.type = "INVALID_SIZE"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = os.path.abspath(__file__)
try:
self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
except LibcloudError:
pass
else:
self.fail("Exception was not thrown")
@unittest.skip("The MockHttp classes cannot support this test at present")
def test_download_object_as_stream_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
def mock_get_object(
self, obj, callback, callback_kwargs, response, success_status_code=None
):
return response._response.iter_content(1024)
old_func = self.driver_type._get_object
self.driver_type._get_object = mock_get_object
try:
stream = self.driver.download_object_as_stream(obj=obj, chunk_size=1024)
self.assertTrue(hasattr(stream, "__iter__"))
finally:
self.driver_type._get_object = old_func
def test_upload_object_invalid_ex_storage_class(self):
# Invalid hash is detected on the amazon side and BAD_REQUEST is
# returned
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
ex_storage_class="invalid-class",
)
except ValueError as e:
self.assertTrue(str(e).lower().find("invalid storage class") != -1)
else:
self.fail("Exception was not thrown")
def test_upload_object_invalid_hash1(self):
# Invalid hash is detected on the amazon side and BAD_REQUEST is
# returned
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
headers = {"etag": '"foobar"'}
return {
"response": make_response(200, headers=headers),
"bytes_transferred": 1000,
"data_hash": "hash343hhash89h932439jsaa89",
}
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
)
except ObjectHashMismatchError:
pass
else:
self.fail("Invalid hash was returned but an exception was not thrown")
finally:
self.driver_type._upload_object = old_func
def test_upload_object_invalid_hash2(self):
# Invalid hash is detected when comparing hash provided in the response
# ETag header
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
headers = {"etag": '"hash343hhash89h932439jsaa89"'}
return {
"response": make_response(200, headers=headers),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
)
except ObjectHashMismatchError:
pass
else:
self.fail("Invalid hash was returned but an exception was not thrown")
finally:
self.driver_type._upload_object = old_func
def test_upload_object_invalid_hash_kms_encryption(self):
# Hash check should be skipped when AWS KMS server side encryption is
# used
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
headers = {"etag": "blahblah", "x-amz-server-side-encryption": "aws:kms"}
return {
"response": make_response(200, headers=headers),
"bytes_transferred": 1000,
"data_hash": "hash343hhash89h932439jsaa81",
}
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
)
finally:
self.driver_type._upload_object = old_func
def test_upload_object_success(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
200, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
self.mock_response_klass.type = None
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
extra = {"meta_data": {"some-value": "foobar"}}
obj = self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True,
)
self.assertEqual(obj.name, "foo_test_upload")
self.assertEqual(obj.size, 1000)
self.assertTrue("some-value" in obj.meta_data)
self.driver_type._upload_object = old_func
def test_upload_object_with_acl(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
headers = {"etag": "0cc175b9c0f1b6a831c399e269772661"}
return {
"response": make_response(200, headers=headers),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
self.mock_response_klass.type = None
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
extra = {"acl": "public-read"}
obj = self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True,
)
self.assertEqual(obj.name, "foo_test_upload")
self.assertEqual(obj.size, 1000)
self.assertEqual(obj.extra["acl"], "public-read")
self.driver_type._upload_object = old_func
def test_upload_empty_object_via_stream(self):
if self.driver.supports_s3_multipart_upload:
self.mock_response_klass.type = "MULTIPART"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = BytesIO(b(""))
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 0)
def test_upload_small_object_via_stream(self):
if self.driver.supports_s3_multipart_upload:
self.mock_response_klass.type = "MULTIPART"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = BytesIO(b("234"))
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
def test_upload_big_object_via_stream(self):
if self.driver.supports_s3_multipart_upload:
self.mock_response_klass.type = "MULTIPART"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = BytesIO(b("234" * CHUNK_SIZE))
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, CHUNK_SIZE * 3)
def test_upload_object_via_stream_guess_file_mime_type(self):
if self.driver.supports_s3_multipart_upload:
self.mock_response_klass.type = "MULTIPART"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = BytesIO(b("234"))
with mock.patch(
"libcloud.utils.files.guess_file_mime_type", autospec=True
) as mock_guess_file_mime_type:
mock_guess_file_mime_type.return_value = ("application/zip", None)
self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator
)
mock_guess_file_mime_type.assert_called_with(object_name)
def test_upload_object_via_stream_abort(self):
if not self.driver.supports_s3_multipart_upload:
return
self.mock_response_klass.type = "MULTIPART"
def _faulty_iterator():
for i in range(0, 5):
yield str(i)
raise RuntimeError("Error in fetching data")
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = _faulty_iterator()
extra = {"content_type": "text/plain"}
try:
self.driver.upload_object_via_stream(
container=container,
object_name=object_name,
iterator=iterator,
extra=extra,
)
except Exception:
pass
return
def test_s3_list_multipart_uploads(self):
if not self.driver.supports_s3_multipart_upload:
return
self.mock_response_klass.type = "LIST_MULTIPART"
S3StorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
for upload in self.driver.ex_iterate_multipart_uploads(container):
self.assertNotEqual(upload.key, None)
self.assertNotEqual(upload.id, None)
self.assertNotEqual(upload.created_at, None)
self.assertNotEqual(upload.owner, None)
self.assertNotEqual(upload.initiator, None)
def test_s3_abort_multipart_uploads(self):
if not self.driver.supports_s3_multipart_upload:
return
self.mock_response_klass.type = "LIST_MULTIPART"
S3StorageDriver.RESPONSES_PER_REQUEST = 2
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
self.driver.ex_cleanup_all_multipart_uploads(container)
def test_delete_object_not_found(self):
self.mock_response_klass.type = "NOT_FOUND"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1234,
hash=None,
extra=None,
meta_data=None,
container=container,
driver=self.driver,
)
try:
self.driver.delete_object(obj=obj)
except ObjectDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_delete_object_success(self):
self.mock_response_klass.type = "DELETE"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1234,
hash=None,
extra=None,
meta_data=None,
container=container,
driver=self.driver,
)
result = self.driver.delete_object(obj=obj)
self.assertTrue(result)
def test_region_keyword_argument(self):
# Default region
driver = S3StorageDriver(*self.driver_args)
self.assertEqual(driver.region, "us-east-1")
self.assertEqual(driver.connection.host, "s3.amazonaws.com")
# Custom region
driver = S3StorageDriver(*self.driver_args, region="us-west-2")
self.assertEqual(driver.region, "us-west-2")
self.assertEqual(driver.connection.host, "s3-us-west-2.amazonaws.com")
# Verify class instance and class variables don't get mixed up
driver1 = S3StorageDriver(*self.driver_args, region="us-west-2")
self.assertEqual(driver1.region, "us-west-2")
self.assertEqual(driver1.connection.host, "s3-us-west-2.amazonaws.com")
driver2 = S3StorageDriver(*self.driver_args, region="ap-south-1")
self.assertEqual(driver2.region, "ap-south-1")
self.assertEqual(driver2.connection.host, "s3-ap-south-1.amazonaws.com")
self.assertEqual(driver1.region, "us-west-2")
self.assertEqual(driver1.connection.host, "s3-us-west-2.amazonaws.com")
# Test all supported regions
for region in S3StorageDriver.list_regions():
driver = S3StorageDriver(*self.driver_args, region=region)
self.assertEqual(driver.region, region)
# Invalid region
expected_msg = "Invalid or unsupported region: foo"
self.assertRaisesRegex(
ValueError, expected_msg, S3StorageDriver, *self.driver_args, region="foo"
)
# host argument still has precedence over reguin
driver3 = S3StorageDriver(*self.driver_args, region="ap-south-1", host="host1.bar.com")
self.assertEqual(driver3.region, "ap-south-1")
self.assertEqual(driver3.connection.host, "host1.bar.com")
driver4 = S3StorageDriver(*self.driver_args, host="host2.bar.com")
self.assertEqual(driver4.connection.host, "host2.bar.com")
def test_deprecated_driver_class_per_region(self):
driver = S3USWestStorageDriver(*self.driver_args)
self.assertEqual(driver.region, "us-west-1")
if __name__ == "__main__":
sys.exit(unittest.main())