blob: 213406288aed69464208b2dfba5eaacca9597aeb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import copy
import hmac
import math
import hashlib
import os.path # pylint: disable-msg=W0404
from io import BytesIO
from hashlib import sha1
from unittest import mock
from unittest.mock import Mock, PropertyMock
import libcloud.utils.files
from libcloud.test import MockHttp # pylint: disable-msg=E0611
from libcloud.test import unittest, make_response, generate_random_data
from libcloud.utils.py3 import StringIO, b, httplib, urlquote
from libcloud.utils.files import exhaust_iterator
from libcloud.common.types import MalformedResponseError
from libcloud.storage.base import CHUNK_SIZE, Object, Container
from libcloud.storage.types import (
ObjectDoesNotExistError,
ObjectHashMismatchError,
ContainerIsNotEmptyError,
InvalidContainerNameError,
ContainerDoesNotExistError,
ContainerAlreadyExistsError,
)
from libcloud.test.storage.base import BaseRangeDownloadMockHttp
from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
from libcloud.storage.drivers.cloudfiles import CloudFilesStorageDriver
class CloudFilesTests(unittest.TestCase):
driver_klass = CloudFilesStorageDriver
driver_args = ("dummy", "dummy")
driver_kwargs = {}
region = "ord"
def setUp(self):
self.driver_klass.connectionCls.conn_class = CloudFilesMockHttp
CloudFilesMockHttp.type = None
driver_kwargs = self.driver_kwargs.copy()
driver_kwargs["region"] = self.region
self.driver = self.driver_klass(*self.driver_args, **driver_kwargs)
# normally authentication happens lazily, but we force it here
self.driver.connection._populate_hosts_and_request_paths()
self._remove_test_file()
def tearDown(self):
self._remove_test_file()
def test_invalid_ex_force_service_region(self):
driver = CloudFilesStorageDriver("driver", "dummy", ex_force_service_region="invalid")
try:
driver.list_containers()
except Exception as e:
self.assertEqual(e.value, "Could not find specified endpoint")
else:
self.fail("Exception was not thrown")
def test_ex_force_service_region(self):
driver = CloudFilesStorageDriver("driver", "dummy", ex_force_service_region="ORD")
driver.list_containers()
def test_force_auth_token_kwargs(self):
base_url = "https://cdn2.clouddrive.com/v1/MossoCloudFS"
kwargs = {
"ex_force_auth_token": "some-auth-token",
"ex_force_base_url": base_url,
}
driver = CloudFilesStorageDriver("driver", "dummy", **kwargs)
driver.list_containers()
self.assertEqual(kwargs["ex_force_auth_token"], driver.connection.auth_token)
self.assertEqual("cdn2.clouddrive.com", driver.connection.host)
self.assertEqual("/v1/MossoCloudFS", driver.connection.request_path)
def test_force_auth_url_kwargs(self):
kwargs = {
"ex_force_auth_version": "2.0",
"ex_force_auth_url": "https://identity.api.rackspace.com",
}
driver = CloudFilesStorageDriver("driver", "dummy", **kwargs)
self.assertEqual(kwargs["ex_force_auth_url"], driver.connection._ex_force_auth_url)
self.assertEqual(kwargs["ex_force_auth_version"], driver.connection._auth_version)
def test_invalid_json_throws_exception(self):
CloudFilesMockHttp.type = "MALFORMED_JSON"
try:
self.driver.list_containers()
except MalformedResponseError:
pass
else:
self.fail("Exception was not thrown")
def test_service_catalog(self):
url = "https://storage4.%s1.clouddrive.com/v1/MossoCloudFS" % (self.region)
self.assertEqual(url, self.driver.connection.get_endpoint())
self.driver.connection.cdn_request = True
self.assertEqual(
"https://cdn.clouddrive.com/v1/MossoCloudFS",
self.driver.connection.get_endpoint(),
)
self.driver.connection.cdn_request = False
def test_list_containers(self):
CloudFilesMockHttp.type = "EMPTY"
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
CloudFilesMockHttp.type = None
containers = self.driver.list_containers()
self.assertEqual(len(containers), 3)
container = [c for c in containers if c.name == "container2"][0]
self.assertEqual(container.extra["object_count"], 120)
self.assertEqual(container.extra["size"], 340084450)
def test_list_container_objects(self):
CloudFilesMockHttp.type = "EMPTY"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
CloudFilesMockHttp.type = None
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 4)
obj = [o for o in objects if o.name == "foo test 1"][0]
self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc")
self.assertEqual(obj.size, 1160520)
self.assertEqual(obj.container.name, "test_container")
def test_list_container_object_name_encoding(self):
CloudFilesMockHttp.type = "EMPTY"
container = Container(name="test container 1", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
def test_list_container_objects_with_prefix(self):
CloudFilesMockHttp.type = "EMPTY"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container, prefix="test_prefix1")
self.assertEqual(len(objects), 0)
CloudFilesMockHttp.type = None
objects = self.driver.list_container_objects(container=container, prefix="test_prefix2")
self.assertEqual(len(objects), 4)
obj = [o for o in objects if o.name == "foo test 1"][0]
self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc")
self.assertEqual(obj.size, 1160520)
self.assertEqual(obj.container.name, "test_container")
def test_list_container_objects_iterator(self):
CloudFilesMockHttp.type = "ITERATOR"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 5)
obj = [o for o in objects if o.name == "foo-test-1"][0]
self.assertEqual(obj.hash, "16265549b5bda64ecdaa5156de4c97cc")
self.assertEqual(obj.size, 1160520)
self.assertEqual(obj.container.name, "test_container")
def test_get_container(self):
container = self.driver.get_container(container_name="test_container")
self.assertEqual(container.name, "test_container")
self.assertEqual(container.extra["object_count"], 800)
self.assertEqual(container.extra["size"], 1234568)
def test_get_container_not_found(self):
try:
self.driver.get_container(container_name="not_found")
except ContainerDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_get_object_success(self):
obj = self.driver.get_object(container_name="test_container", object_name="test_object")
self.assertEqual(obj.container.name, "test_container")
self.assertEqual(obj.size, 555)
self.assertEqual(obj.hash, "6b21c4a111ac178feacf9ec9d0c71f17")
self.assertEqual(obj.extra["content_type"], "application/zip")
self.assertEqual(obj.extra["last_modified"], "Tue, 25 Jan 2011 22:01:49 GMT")
self.assertEqual(obj.meta_data["foo-bar"], "test 1")
self.assertEqual(obj.meta_data["bar-foo"], "test 2")
def test_get_object_object_name_encoding(self):
obj = self.driver.get_object(container_name="test_container", object_name="~/test_object/")
self.assertEqual(obj.name, "~/test_object/")
def test_get_object_not_found(self):
try:
self.driver.get_object(container_name="test_container", object_name="not_found")
except ObjectDoesNotExistError:
pass
else:
self.fail("Exception was not thrown")
def test_create_container_success(self):
container = self.driver.create_container(container_name="test_create_container")
self.assertTrue(isinstance(container, Container))
self.assertEqual(container.name, "test_create_container")
self.assertEqual(container.extra["object_count"], 0)
def test_create_container_already_exists(self):
CloudFilesMockHttp.type = "ALREADY_EXISTS"
try:
self.driver.create_container(container_name="test_create_container")
except ContainerAlreadyExistsError:
pass
else:
self.fail("Container already exists but an exception was not thrown")
def test_create_container_invalid_name_too_long(self):
name = "".join(["x" for x in range(0, 257)])
try:
self.driver.create_container(container_name=name)
except InvalidContainerNameError:
pass
else:
self.fail(
"Invalid name was provided (name is too long)" ", but exception was not thrown"
)
def test_create_container_invalid_name_slashes_in_name(self):
try:
self.driver.create_container(container_name="test/slashes/")
except InvalidContainerNameError:
pass
else:
self.fail(
"Invalid name was provided (name contains slashes)" ", but exception was not thrown"
)
def test_delete_container_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
result = self.driver.delete_container(container=container)
self.assertTrue(result)
def test_delete_container_not_found(self):
CloudFilesMockHttp.type = "NOT_FOUND"
container = Container(name="foo_bar_container", extra={}, driver=self)
try:
self.driver.delete_container(container=container)
except ContainerDoesNotExistError:
pass
else:
self.fail("Container does not exist but an exception was not thrown")
def test_delete_container_not_empty(self):
CloudFilesMockHttp.type = "NOT_EMPTY"
container = Container(name="foo_bar_container", extra={}, driver=self)
try:
self.driver.delete_container(container=container)
except ContainerIsNotEmptyError:
pass
else:
self.fail("Container is not empty but an exception was not thrown")
def test_download_object_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
destination_path = os.path.abspath(__file__) + ".temp"
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertTrue(result)
def test_download_object_invalid_file_size(self):
CloudFilesMockHttp.type = "INVALID_SIZE"
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
destination_path = os.path.abspath(__file__) + ".temp"
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertFalse(result)
def test_download_object_success_not_found(self):
CloudFilesMockHttp.type = "NOT_FOUND"
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
destination_path = os.path.abspath(__file__) + ".temp"
try:
self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
except ObjectDoesNotExistError:
pass
else:
self.fail("Object does not exist but an exception was not thrown")
def test_download_object_range_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object_range",
size=10,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
destination_path = os.path.abspath(__file__) + ".temp"
result = self.driver.download_object_range(
obj=obj,
destination_path=destination_path,
start_bytes=5,
end_bytes=7,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertTrue(result)
with open(destination_path) as fp:
content = fp.read()
self.assertEqual(content, "56")
def test_download_object_as_stream(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
stream = self.driver.download_object_as_stream(obj=obj, chunk_size=None)
self.assertTrue(hasattr(stream, "__iter__"))
def test_download_object_as_stream_data_is_not_buffered_in_memory(self):
# Test case which verifies that response.response attribute is not accessed
# and as such, whole body response is not buffered into RAM
# If content is consumed and response.content attribute accessed exception
# will be thrown and test will fail
mock_response = Mock(name="mock response")
mock_response.headers = {}
mock_response.status = 200
msg1 = '"response" attribute was accessed but it shouldn\'t have been'
msg2 = '"content" attribute was accessed but it shouldn\'t have been'
type(mock_response).response = PropertyMock(
name="mock response attribute", side_effect=Exception(msg1)
)
type(mock_response).content = PropertyMock(
name="mock content attribute", side_effect=Exception(msg2)
)
mock_response.iter_content.return_value = StringIO("a" * 1000)
self.driver.connection.request = Mock()
self.driver.connection.request.return_value = mock_response
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object_NO_BUFFER",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver,
)
result = self.driver.download_object_as_stream(obj=obj)
result = exhaust_iterator(result)
result = result.decode("utf-8")
self.assertEqual(result, "a" * 1000)
def test_download_object_range_as_stream_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object_range",
size=2,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
stream = self.driver.download_object_range_as_stream(
start_bytes=5, end_bytes=7, obj=obj, chunk_size=None
)
self.assertTrue(hasattr(stream, "__iter__"))
consumed_stream = "".join(chunk.decode("utf-8") for chunk in stream)
self.assertEqual(consumed_stream, "56")
self.assertEqual(len(consumed_stream), obj.size)
def test_upload_object_success(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
old_func = CloudFilesStorageDriver._upload_object
CloudFilesStorageDriver._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
extra = {"meta_data": {"some-value": "foobar"}}
obj = self.driver.upload_object(
file_path=file_path,
container=container,
extra=extra,
object_name=object_name,
)
self.assertEqual(obj.name, "foo_test_upload")
self.assertEqual(obj.size, 1000)
self.assertTrue("some-value" in obj.meta_data)
CloudFilesStorageDriver._upload_object = old_func
def test_upload_object_zero_size_object(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 0,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
old_func = CloudFilesStorageDriver._upload_object
CloudFilesStorageDriver._upload_object = upload_file
old_request = self.driver.connection.request
file_path = os.path.join(os.path.dirname(__file__), "__init__.py")
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "empty"
extra = {}
def func(*args, **kwargs):
self.assertEqual(kwargs["headers"]["Content-Length"], 0)
func.called = True
return old_request(*args, **kwargs)
self.driver.connection.request = func
func.called = False
obj = self.driver.upload_object(
file_path=file_path,
container=container,
extra=extra,
object_name=object_name,
)
self.assertEqual(obj.name, "empty")
self.assertEqual(obj.size, 0)
CloudFilesStorageDriver._upload_object = old_func
self.driver.connection.request = old_request
def test_upload_object_invalid_hash(self):
CloudFilesMockHttp.type = "INVALID_HASH"
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "blah blah",
}
old_func = CloudFilesStorageDriver._upload_object
CloudFilesStorageDriver._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
)
except ObjectHashMismatchError:
pass
else:
self.fail("Invalid hash was returned but an exception was not thrown")
finally:
CloudFilesStorageDriver._upload_object = old_func
def test_upload_object_no_content_type(self):
def no_content_type(name):
return None, None
old_func = libcloud.utils.files.guess_file_mime_type
libcloud.utils.files.guess_file_mime_type = no_content_type
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
obj = self.driver.upload_object(
file_path=file_path,
verify_hash=False,
container=container,
object_name=object_name,
)
self.assertEqual(obj.name, object_name)
libcloud.utils.files.guess_file_mime_type = old_func
def test_upload_object_inexistent_file(self):
def dummy_content_type(name):
return "application/zip", None
old_func = libcloud.utils.files.guess_file_mime_type
libcloud.utils.files.guess_file_mime_type = dummy_content_type
file_path = os.path.abspath(__file__ + ".inexistent")
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path, container=container, object_name=object_name
)
except OSError:
pass
else:
self.fail("Inexistent but an exception was not thrown")
finally:
libcloud.utils.files.guess_file_mime_type = old_func
def test_upload_object_via_stream(self):
def dummy_content_type(name):
return "application/zip", None
old_func = libcloud.utils.files.guess_file_mime_type
libcloud.utils.files.guess_file_mime_type = dummy_content_type
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_stream_data"
iterator = BytesIO(b("235"))
try:
self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator
)
finally:
libcloud.utils.files.guess_file_mime_type = old_func
def test_upload_object_via_stream_stream_seek_at_end(self):
def dummy_content_type(name):
return "application/zip", None
old_func = libcloud.utils.files.guess_file_mime_type
libcloud.utils.files.guess_file_mime_type = dummy_content_type
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_stream_data_seek"
iterator = BytesIO(b("123456789"))
iterator.seek(10)
self.assertEqual(iterator.tell(), 10)
try:
self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator
)
finally:
libcloud.utils.files.guess_file_mime_type = old_func
def test_delete_object_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
status = self.driver.delete_object(obj=obj)
self.assertTrue(status)
def test_delete_object_not_found(self):
CloudFilesMockHttp.type = "NOT_FOUND"
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=CloudFilesStorageDriver,
)
try:
self.driver.delete_object(obj=obj)
except ObjectDoesNotExistError:
pass
else:
self.fail("Object does not exist but an exception was not thrown")
def test_ex_get_meta_data(self):
meta_data = self.driver.ex_get_meta_data()
self.assertTrue(isinstance(meta_data, dict))
self.assertTrue("object_count" in meta_data)
self.assertTrue("container_count" in meta_data)
self.assertTrue("bytes_used" in meta_data)
self.assertTrue("temp_url_key" in meta_data)
def test_ex_purge_object_from_cdn(self):
CloudFilesMockHttp.type = "PURGE_SUCCESS"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self,
)
self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj))
def test_ex_purge_object_from_cdn_with_email(self):
CloudFilesMockHttp.type = "PURGE_SUCCESS_EMAIL"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self,
)
self.assertTrue(self.driver.ex_purge_object_from_cdn(obj=obj, email="test@test.com"))
@mock.patch("os.path.getsize")
def test_ex_multipart_upload_object_for_small_files(self, getsize_mock):
getsize_mock.return_value = 0
old_func = CloudFilesStorageDriver.upload_object
mocked_upload_object = mock.Mock(return_value="test")
CloudFilesStorageDriver.upload_object = mocked_upload_object
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
obj = self.driver.ex_multipart_upload_object(
file_path=file_path, container=container, object_name=object_name
)
CloudFilesStorageDriver.upload_object = old_func
self.assertTrue(mocked_upload_object.called)
self.assertEqual(obj, "test")
def test_ex_multipart_upload_object_success(self):
_upload_object_part = CloudFilesStorageDriver._upload_object_part
_upload_object_manifest = CloudFilesStorageDriver._upload_object_manifest
mocked__upload_object_part = mock.Mock(return_value="test_part")
mocked__upload_object_manifest = mock.Mock(return_value="test_manifest")
CloudFilesStorageDriver._upload_object_part = mocked__upload_object_part
CloudFilesStorageDriver._upload_object_manifest = mocked__upload_object_manifest
parts = 5
file_path = os.path.abspath(__file__)
chunk_size = int(math.ceil(float(os.path.getsize(file_path)) / parts))
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "foo_test_upload"
self.driver.ex_multipart_upload_object(
file_path=file_path,
container=container,
object_name=object_name,
chunk_size=chunk_size,
)
CloudFilesStorageDriver._upload_object_part = _upload_object_part
CloudFilesStorageDriver._upload_object_manifest = _upload_object_manifest
self.assertEqual(mocked__upload_object_part.call_count, parts)
self.assertTrue(mocked__upload_object_manifest.call_count, 1)
def test__upload_object_part(self):
_put_object = CloudFilesStorageDriver._put_object
mocked__put_object = mock.Mock(return_value="test")
CloudFilesStorageDriver._put_object = mocked__put_object
part_number = 7
object_name = "test_object"
expected_name = object_name + "/%08d" % part_number
container = Container(name="foo_bar_container", extra={}, driver=self)
self.driver._upload_object_part(container, object_name, part_number, None)
CloudFilesStorageDriver._put_object = _put_object
func_kwargs = tuple(mocked__put_object.call_args)[1]
self.assertEqual(func_kwargs["object_name"], expected_name)
self.assertEqual(func_kwargs["container"], container)
def test_upload_object_via_stream_with_cors_headers(self):
"""
Test we can add some ``Cross-origin resource sharing`` headers
to the request about to be sent.
"""
cors_headers = {
"Access-Control-Allow-Origin": "http://mozilla.com",
"Origin": "http://storage.clouddrive.com",
}
expected_headers = {
# Automatically added headers
"Content-Type": "application/octet-stream"
}
expected_headers.update(cors_headers)
def intercept_request(request_path, method=None, data=None, headers=None, raw=True):
# What we're actually testing
self.assertDictEqual(expected_headers, headers)
raise NotImplementedError("oops")
self.driver.connection.request = intercept_request
container = Container(name="CORS", extra={}, driver=self.driver)
try:
self.driver.upload_object_via_stream(
iterator=iter(b"blob data like an image or video"),
container=container,
object_name="test_object",
headers=cors_headers,
)
except NotImplementedError:
# Don't care about the response we'd have to mock anyway
# as long as we intercepted the request and checked its headers
pass
else:
self.fail(
"Expected NotImplementedError to be thrown to "
"verify we actually checked the expected headers"
)
def test_upload_object_via_stream_python3_bytes_error(self):
container = Container(name="py3", extra={}, driver=self.driver)
bytes_blob = b"blob data like an image or video"
# This is mostly to check we didn't discover other errors along the way
mocked_response = container.upload_object_via_stream(
iterator=iter(bytes_blob),
object_name="img_or_vid",
)
self.assertEqual(len(bytes_blob), mocked_response.size)
@unittest.skip("Skipping as chunking is disabled in 2.0rc1")
def test_upload_object_via_stream_chunked_encoding(self):
# Create enough bytes it should get split into two chunks
bytes_blob = "".join(["\0" for _ in range(CHUNK_SIZE + 1)])
hex_chunk_size = ("%X" % CHUNK_SIZE).encode("utf8")
expected = [
# Chunk 1
hex_chunk_size + b"\r\n",
bytes(bytes_blob[:CHUNK_SIZE].encode("utf8")),
b"\r\n",
# Chunk 2
b"1\r\n",
bytes(bytes_blob[CHUNK_SIZE:].encode("utf8")),
b"\r\n",
# If chunked, also send a final message
b"0\r\n\r\n",
]
logged_data = []
class InterceptResponse(MockHttp):
def __init__(self, connection, response=None):
super().__init__(connection=connection, response=response)
old_send = self.connection.connection.send
def intercept_send(data):
old_send(data)
logged_data.append(data)
self.connection.connection.send = intercept_send
def _v1_MossoCloudFS_py3_img_or_vid2(self, method, url, body, headers):
headers = {"etag": "d79fb00c27b50494a463e680d459c90c"}
headers.update(self.base_headers)
_201 = httplib.CREATED
return _201, "", headers, httplib.responses[_201]
self.driver_klass.connectionCls.rawResponseCls = InterceptResponse
container = Container(name="py3", extra={}, driver=self.driver)
container.upload_object_via_stream(
iterator=iter(bytes_blob),
object_name="img_or_vid2",
)
self.assertListEqual(expected, logged_data)
def test__upload_object_manifest(self):
hash_function = self.driver._get_hash_function()
hash_function.update(b(""))
data_hash = hash_function.hexdigest()
fake_response = type("CloudFilesResponse", (), {"headers": {"etag": data_hash}})
_request = self.driver.connection.request
mocked_request = mock.Mock(return_value=fake_response)
self.driver.connection.request = mocked_request
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "test_object"
self.driver._upload_object_manifest(container, object_name)
func_args, func_kwargs = tuple(mocked_request.call_args)
self.driver.connection.request = _request
self.assertEqual(func_args[0], "/" + container.name + "/" + object_name)
self.assertEqual(
func_kwargs["headers"]["X-Object-Manifest"],
container.name + "/" + object_name + "/",
)
self.assertEqual(func_kwargs["method"], "PUT")
def test__upload_object_manifest_wrong_hash(self):
fake_response = type("CloudFilesResponse", (), {"headers": {"etag": "0000000"}})
_request = self.driver.connection.request
mocked_request = mock.Mock(return_value=fake_response)
self.driver.connection.request = mocked_request
container = Container(name="foo_bar_container", extra={}, driver=self)
object_name = "test_object"
try:
self.driver._upload_object_manifest(container, object_name)
except ObjectHashMismatchError:
pass
else:
self.fail("Exception was not thrown")
finally:
self.driver.connection.request = _request
def test_create_container_put_object_name_encoding(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
201, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
old_func = CloudFilesStorageDriver._upload_object
CloudFilesStorageDriver._upload_object = upload_file
container_name = "speci@l_name"
object_name = "m@obj€ct"
file_path = os.path.abspath(__file__)
container = self.driver.create_container(container_name=container_name)
self.assertEqual(container.name, container_name)
obj = self.driver.upload_object(
file_path=file_path, container=container, object_name=object_name
)
self.assertEqual(obj.name, object_name)
CloudFilesStorageDriver._upload_object = old_func
def test_ex_enable_static_website(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
result = self.driver.ex_enable_static_website(container=container, index_file="index.html")
self.assertTrue(result)
def test_ex_set_error_page(self):
container = Container(name="foo_bar_container", extra={}, driver=self)
result = self.driver.ex_set_error_page(container=container, file_name="error.html")
self.assertTrue(result)
def test_ex_set_account_metadata_temp_url_key(self):
result = self.driver.ex_set_account_metadata_temp_url_key("a key")
self.assertTrue(result)
@mock.patch("libcloud.storage.drivers.cloudfiles.time")
def test_ex_get_object_temp_url(self, time):
time.return_value = 0
self.driver.ex_get_meta_data = mock.Mock()
self.driver.ex_get_meta_data.return_value = {
"container_count": 1,
"object_count": 1,
"bytes_used": 1,
"temp_url_key": "foo",
}
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self,
)
hmac_body = "{}\n{}\n{}".format(
"GET",
60,
"/v1/MossoCloudFS/foo_bar_container/foo_bar_object",
)
sig = hmac.new(b("foo"), b(hmac_body), sha1).hexdigest()
ret = self.driver.ex_get_object_temp_url(obj, "GET")
temp_url = (
"https://storage4.%s1.clouddrive.com/v1/MossoCloudFS/"
"foo_bar_container/foo_bar_object?temp_url_expires=60&temp_url_sig=%s"
% (self.region, sig)
)
self.assertEqual("".join(sorted(ret)), "".join(sorted(temp_url)))
def test_ex_get_object_temp_url_no_key_raises_key_error(self):
self.driver.ex_get_meta_data = mock.Mock()
self.driver.ex_get_meta_data.return_value = {
"container_count": 1,
"object_count": 1,
"bytes_used": 1,
"temp_url_key": None,
}
container = Container(name="foo_bar_container", extra={}, driver=self)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self,
)
self.assertRaises(KeyError, self.driver.ex_get_object_temp_url, obj, "GET")
def _remove_test_file(self):
file_path = os.path.abspath(__file__) + ".temp"
try:
os.unlink(file_path)
except OSError:
pass
class CloudFilesDeprecatedUSTests(CloudFilesTests):
driver_klass = CloudFilesStorageDriver
region = "ord"
class CloudFilesDeprecatedUKTests(CloudFilesTests):
driver_klass = CloudFilesStorageDriver
region = "lon"
class CloudFilesMockHttp(BaseRangeDownloadMockHttp, unittest.TestCase):
fixtures = StorageFileFixtures("cloudfiles")
base_headers = {"content-type": "application/json; charset=UTF-8"}
# fake auth token response
def _v2_0_tokens(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
body = self.fixtures.load("_v2_0__auth.json")
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_MALFORMED_JSON(self, method, url, body, headers):
# test_invalid_json_throws_exception
body = 'broken: json /*"'
return (
httplib.NO_CONTENT,
body,
self.base_headers,
httplib.responses[httplib.OK],
)
def _v1_MossoCloudFS_EMPTY(self, method, url, body, headers):
return (
httplib.NO_CONTENT,
body,
self.base_headers,
httplib.responses[httplib.OK],
)
def _v1_MossoCloudFS(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
if method == "GET":
# list_containers
body = self.fixtures.load("list_containers.json")
status_code = httplib.OK
elif method == "HEAD":
# get_meta_data
body = self.fixtures.load("meta_data.json")
status_code = httplib.NO_CONTENT
headers.update(
{
"x-account-container-count": "10",
"x-account-object-count": "400",
"x-account-bytes-used": "1234567",
}
)
elif method == "POST":
body = ""
status_code = httplib.NO_CONTENT
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_not_found(self, method, url, body, headers):
# test_get_object_not_found
if method == "HEAD":
body = ""
else:
raise ValueError("Invalid method")
return (
httplib.NOT_FOUND,
body,
self.base_headers,
httplib.responses[httplib.OK],
)
def _v1_MossoCloudFS_test_container_EMPTY(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects_empty.json")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_20container_201_EMPTY(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects_empty.json")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_container(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
if method == "GET":
# list_container_objects
if url.find("marker") == -1:
body = self.fixtures.load("list_container_objects.json")
status_code = httplib.OK
else:
body = ""
status_code = httplib.NO_CONTENT
elif method == "HEAD":
# get_container
body = self.fixtures.load("list_container_objects_empty.json")
status_code = httplib.NO_CONTENT
headers.update({"x-container-object-count": "800", "x-container-bytes-used": "1234568"})
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_container_ITERATOR(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
# list_container_objects
if url.find("foo-test-3") != -1:
body = self.fixtures.load("list_container_objects_not_exhausted2.json")
status_code = httplib.OK
elif url.find("foo-test-5") != -1:
body = ""
status_code = httplib.NO_CONTENT
else:
# First request
body = self.fixtures.load("list_container_objects_not_exhausted1.json")
status_code = httplib.OK
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_container_not_found(self, method, url, body, headers):
# test_get_container_not_found
if method == "HEAD":
body = ""
else:
raise ValueError("Invalid method")
return (
httplib.NOT_FOUND,
body,
self.base_headers,
httplib.responses[httplib.OK],
)
def _v1_MossoCloudFS_test_container_test_object(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
if method == "HEAD":
# get_object
body = self.fixtures.load("list_container_objects_empty.json")
status_code = httplib.NO_CONTENT
headers.update(
{
"content-length": "555",
"last-modified": "Tue, 25 Jan 2011 22:01:49 GMT",
"etag": "6b21c4a111ac178feacf9ec9d0c71f17",
"x-object-meta-foo-bar": "test 1",
"x-object-meta-bar-foo": "test 2",
"content-type": "application/zip",
}
)
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_container__7E_test_object(self, method, url, body, headers):
headers = copy.deepcopy(self.base_headers)
if method == "HEAD":
# get_object_name_encoding
body = self.fixtures.load("list_container_objects_empty.json")
status_code = httplib.NO_CONTENT
headers.update(
{
"content-length": "555",
"last-modified": "Tue, 25 Jan 2011 22:01:49 GMT",
"etag": "6b21c4a111ac178feacf9ec9d0c71f17",
"x-object-meta-foo-bar": "test 1",
"x-object-meta-bar-foo": "test 2",
"content-type": "application/zip",
}
)
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_create_container(self, method, url, body, headers):
# test_create_container_success
headers = copy.deepcopy(self.base_headers)
body = self.fixtures.load("list_container_objects_empty.json")
headers = copy.deepcopy(self.base_headers)
headers.update({"content-length": "18", "date": "Mon, 28 Feb 2011 07:52:57 GMT"})
status_code = httplib.CREATED
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_speci_40l_name(self, method, url, body, headers):
# test_create_container_put_object_name_encoding
# Verify that the name is properly url encoded
container_name = "speci@l_name"
encoded_container_name = urlquote(container_name)
self.assertTrue(encoded_container_name in url)
headers = copy.deepcopy(self.base_headers)
body = self.fixtures.load("list_container_objects_empty.json")
headers = copy.deepcopy(self.base_headers)
headers.update({"content-length": "18", "date": "Mon, 28 Feb 2011 07:52:57 GMT"})
status_code = httplib.CREATED
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_test_create_container_ALREADY_EXISTS(self, method, url, body, headers):
# test_create_container_already_exists
headers = copy.deepcopy(self.base_headers)
body = self.fixtures.load("list_container_objects_empty.json")
headers.update({"content-type": "text/plain"})
status_code = httplib.ACCEPTED
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container(self, method, url, body, headers):
if method == "DELETE":
# test_delete_container_success
body = self.fixtures.load("list_container_objects_empty.json")
headers = self.base_headers
status_code = httplib.NO_CONTENT
elif method == "POST":
# test_ex_enable_static_website
body = ""
headers = self.base_headers
status_code = httplib.ACCEPTED
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS(self, method, url, body, headers):
if method == "DELETE":
# test_ex_purge_from_cdn
headers = self.base_headers
status_code = httplib.NO_CONTENT
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_object_PURGE_SUCCESS_EMAIL(
self, method, url, body, headers
):
if method == "DELETE":
# test_ex_purge_from_cdn_with_email
self.assertEqual(headers["X-Purge-Email"], "test@test.com")
headers = self.base_headers
status_code = httplib.NO_CONTENT
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_NOT_FOUND(self, method, url, body, headers):
if method == "DELETE":
# test_delete_container_not_found
body = self.fixtures.load("list_container_objects_empty.json")
headers = self.base_headers
status_code = httplib.NOT_FOUND
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_NOT_EMPTY(self, method, url, body, headers):
if method == "DELETE":
# test_delete_container_not_empty
body = self.fixtures.load("list_container_objects_empty.json")
headers = self.base_headers
status_code = httplib.CONFLICT
return (status_code, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_bar_object(self, method, url, body, headers):
if method == "DELETE":
# test_delete_object_success
body = self.fixtures.load("list_container_objects_empty.json")
headers = self.base_headers
status_code = httplib.NO_CONTENT
return (status_code, body, headers, httplib.responses[httplib.OK])
elif method == "GET":
body = generate_random_data(1000)
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_range(self, method, url, body, headers):
if method == "GET":
# test_download_object_range_success
body = "0123456789123456789"
self.assertTrue("Range" in headers)
self.assertEqual(headers["Range"], "bytes=5-6")
start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(
headers["Range"], body
)
return (
httplib.PARTIAL_CONTENT,
body[start_bytes : end_bytes + 1],
self.base_headers,
httplib.responses[httplib.PARTIAL_CONTENT],
)
def _v1_MossoCloudFS_py3_img_or_vid(self, method, url, body, headers):
headers = {"etag": "e2378cace8712661ce7beec3d9362ef6"}
headers.update(self.base_headers)
return httplib.CREATED, "", headers, httplib.responses[httplib.CREATED]
def _v1_MossoCloudFS_foo_bar_container_foo_test_upload(self, method, url, body, headers):
# test_object_upload_success
body = ""
headers = {}
headers.update(self.base_headers)
headers["etag"] = "hash343hhash89h932439jsaa89"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_speci_40l_name_m_40obj_E2_82_ACct(self, method, url, body, headers):
# test_create_container_put_object_name_encoding
# Verify that the name is properly url encoded
object_name = "m@obj€ct"
urlquote(object_name)
headers = copy.deepcopy(self.base_headers)
body = ""
headers["etag"] = "hash343hhash89h932439jsaa89"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_empty(self, method, url, body, headers):
# test_upload_object_zero_size_object
body = ""
headers = {}
headers.update(self.base_headers)
headers["etag"] = "hash343hhash89h932439jsaa89"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_test_upload_INVALID_HASH(
self, method, url, body, headers
):
# test_object_upload_invalid_hash
body = ""
headers = {}
headers.update(self.base_headers)
headers["etag"] = "foobar"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_INVALID_SIZE(
self, method, url, body, headers
):
# test_download_object_invalid_file_size
body = generate_random_data(100)
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NOT_FOUND(
self, method, url, body, headers
):
body = ""
return (
httplib.NOT_FOUND,
body,
self.base_headers,
httplib.responses[httplib.OK],
)
def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data(self, method, url, body, headers):
# test_upload_object_via_stream_success
hasher = hashlib.md5()
hasher.update(b"235")
hash_value = hasher.hexdigest()
headers = {}
headers.update(self.base_headers)
headers["etag"] = hash_value
body = "test"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_test_stream_data_seek(
self, method, url, body, headers
):
# test_upload_object_via_stream_stream_seek_at_end
hasher = hashlib.md5()
hasher.update(b"123456789")
hash_value = hasher.hexdigest()
headers = {}
headers.update(self.base_headers)
headers["etag"] = hash_value
body = "test"
return (httplib.CREATED, body, headers, httplib.responses[httplib.OK])
def _v1_MossoCloudFS_foo_bar_container_foo_bar_object_NO_BUFFER(
self, method, url, body, headers
):
# test_download_object_data_is_not_buffered_in_memory
headers = {}
headers.update(self.base_headers)
headers["etag"] = "577ef1154f3240ad5b9b413aa7346a1e"
body = generate_random_data(1000)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
if __name__ == "__main__":
sys.exit(unittest.main())