blob: 223264e768c25530a6dc6fe85245a10db8bb7993 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import unittest
from unittest import mock
from libcloud.test import MockHttp # pylint: disable-msg=E0611
from libcloud.test import make_response, generate_random_data
from libcloud.utils.py3 import b, httplib, parse_qs, urlparse
from libcloud.common.types import InvalidCredsError
from libcloud.storage.base import Object, Container
from libcloud.test.secrets import STORAGE_OSS_PARAMS
from libcloud.storage.types import (
ContainerError,
ObjectDoesNotExistError,
ObjectHashMismatchError,
ContainerIsNotEmptyError,
InvalidContainerNameError,
ContainerDoesNotExistError,
)
from libcloud.test.file_fixtures import StorageFileFixtures # pylint: disable-msg=E0611
from libcloud.storage.drivers.oss import CHUNK_SIZE, OSSConnection, OSSStorageDriver
from libcloud.storage.drivers.dummy import DummyIterator
class OSSConnectionTestCase(unittest.TestCase):
def setUp(self):
self.conn = OSSConnection(
"44CF9590006BF252F707", "OtxrzxIsfpFjA7SwPzILwy8Bw21TLhquhboDYROV"
)
def test_signature(self):
expected = b("26NBxoKdsyly4EDv6inkoDft/yA=")
headers = {
"Content-MD5": "ODBGOERFMDMzQTczRUY3NUE3NzA5QzdFNUYzMDQxNEM=",
"Content-Type": "text/html",
"Expires": "Thu, 17 Nov 2005 18:49:58 GMT",
"X-OSS-Meta-Author": "foo@bar.com",
"X-OSS-Magic": "abracadabra",
"Host": "oss-example.oss-cn-hangzhou.aliyuncs.com",
}
action = "/oss-example/nelson"
actual = OSSConnection._get_auth_signature(
"PUT", headers, {}, headers["Expires"], self.conn.key, action, "x-oss-"
)
self.assertEqual(expected, actual)
class ObjectTestCase(unittest.TestCase):
def test_object_with_chinese_name(self):
driver = OSSStorageDriver(*STORAGE_OSS_PARAMS)
obj = Object(
name="中文",
size=0,
hash=None,
extra=None,
meta_data=None,
container=None,
driver=driver,
)
self.assertTrue(obj.__repr__() is not None)
class OSSMockHttp(MockHttp, unittest.TestCase):
fixtures = StorageFileFixtures("oss")
base_headers = {}
def _unauthorized(self, method, url, body, headers):
return (
httplib.UNAUTHORIZED,
"",
self.base_headers,
httplib.responses[httplib.OK],
)
def _list_containers_empty(self, method, url, body, headers):
body = self.fixtures.load("list_containers_empty.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_containers(self, method, url, body, headers):
body = self.fixtures.load("list_containers.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_container_objects_empty(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects_empty.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_container_objects(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_container_objects_chinese(self, method, url, body, headers):
body = self.fixtures.load("list_container_objects_chinese.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _list_container_objects_prefix(self, method, url, body, headers):
params = {"prefix": self.test.prefix}
self.assertUrlContainsQueryParams(url, params)
body = self.fixtures.load("list_container_objects_prefix.xml")
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _get_container(self, method, url, body, headers):
return self._list_containers(method, url, body, headers)
def _get_object(self, method, url, body, headers):
return self._list_containers(method, url, body, headers)
def _notexisted_get_object(self, method, url, body, headers):
return (
httplib.NOT_FOUND,
body,
self.base_headers,
httplib.responses[httplib.NOT_FOUND],
)
def _test_get_object(self, method, url, body, headers):
self.base_headers.update(
{
"accept-ranges": "bytes",
"connection": "keep-alive",
"content-length": "0",
"content-type": "application/octet-stream",
"date": "Sat, 16 Jan 2016 15:38:14 GMT",
"etag": '"D41D8CD98F00B204E9800998ECF8427E"',
"last-modified": "Fri, 15 Jan 2016 14:43:15 GMT",
"server": "AliyunOSS",
"x-oss-object-type": "Normal",
"x-oss-request-id": "569A63E6257784731E3D877F",
"x-oss-meta-rabbits": "monkeys",
}
)
return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])
def _invalid_name(self, method, url, body, headers):
# test_create_container_bad_request
return (httplib.BAD_REQUEST, body, headers, httplib.responses[httplib.OK])
def _already_exists(self, method, url, body, headers):
# test_create_container_already_existed
return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK])
def _create_container(self, method, url, body, headers):
# test_create_container_success
self.assertEqual("PUT", method)
self.assertEqual("", body)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _create_container_location(self, method, url, body, headers):
# test_create_container_success
self.assertEqual("PUT", method)
location_constraint = (
"<CreateBucketConfiguration>"
"<LocationConstraint>%s</LocationConstraint>"
"</CreateBucketConfiguration>" % self.test.ex_location
)
self.assertEqual(location_constraint, body)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _delete_container_doesnt_exist(self, method, url, body, headers):
# test_delete_container_doesnt_exist
return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK])
def _delete_container_not_empty(self, method, url, body, headers):
# test_delete_container_not_empty
return (httplib.CONFLICT, body, headers, httplib.responses[httplib.OK])
def _delete_container(self, method, url, body, headers):
return (
httplib.NO_CONTENT,
body,
self.base_headers,
httplib.responses[httplib.NO_CONTENT],
)
def _foo_bar_object_not_found(self, method, url, body, headers):
# test_delete_object_not_found
return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.OK])
def _foo_bar_object_delete(self, method, url, body, headers):
# test_delete_object
return (httplib.NO_CONTENT, body, headers, httplib.responses[httplib.OK])
def _list_multipart(self, method, url, body, headers):
query_string = urlparse.urlsplit(url).query
query = parse_qs(query_string)
if "key-marker" not in query:
body = self.fixtures.load("ex_iterate_multipart_uploads_p1.xml")
else:
body = self.fixtures.load("ex_iterate_multipart_uploads_p2.xml")
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_object(self, method, url, body, headers):
# test_download_object_success
body = generate_random_data(1000)
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_bar_object_invalid_size(self, method, url, body, headers):
# test_upload_object_invalid_file_size
body = ""
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
def _foo_test_stream_data_multipart(self, method, url, body, headers):
headers = {}
body = ""
headers = {"etag": '"0cc175b9c0f1b6a831c399e269772661"'}
return (httplib.OK, body, headers, httplib.responses[httplib.OK])
class OSSStorageDriverTestCase(unittest.TestCase):
driver_type = OSSStorageDriver
driver_args = STORAGE_OSS_PARAMS
mock_response_klass = OSSMockHttp
@classmethod
def create_driver(self):
return self.driver_type(*self.driver_args)
def setUp(self):
self.driver_type.connectionCls.conn_class = self.mock_response_klass
self.mock_response_klass.type = None
self.mock_response_klass.test = self
self.driver = self.create_driver()
def tearDown(self):
self._remove_test_file()
def _remove_test_file(self):
file_path = os.path.abspath(__file__) + ".temp"
try:
os.unlink(file_path)
except OSError:
pass
def test_invalid_credentials(self):
self.mock_response_klass.type = "unauthorized"
self.assertRaises(InvalidCredsError, self.driver.list_containers)
def test_list_containers_empty(self):
self.mock_response_klass.type = "list_containers_empty"
containers = self.driver.list_containers()
self.assertEqual(len(containers), 0)
def test_list_containers_success(self):
self.mock_response_klass.type = "list_containers"
containers = self.driver.list_containers()
self.assertEqual(len(containers), 2)
container = containers[0]
self.assertEqual("xz02tphky6fjfiuc0", container.name)
self.assertTrue("creation_date" in container.extra)
self.assertEqual("2014-05-15T11:18:32.000Z", container.extra["creation_date"])
self.assertTrue("location" in container.extra)
self.assertEqual("oss-cn-hangzhou-a", container.extra["location"])
self.assertEqual(self.driver, container.driver)
def test_list_container_objects_empty(self):
self.mock_response_klass.type = "list_container_objects_empty"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 0)
def test_list_container_objects_success(self):
self.mock_response_klass.type = "list_container_objects"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 2)
obj = objects[0]
self.assertEqual(obj.name, "en/")
self.assertEqual(obj.hash, "D41D8CD98F00B204E9800998ECF8427E")
self.assertEqual(obj.size, 0)
self.assertEqual(obj.container.name, "test_container")
self.assertEqual(obj.extra["last_modified"], "2016-01-15T14:43:15.000Z")
self.assertTrue("owner" in obj.meta_data)
def test_list_container_objects_with_chinese(self):
self.mock_response_klass.type = "list_container_objects_chinese"
container = Container(name="test_container", extra={}, driver=self.driver)
objects = self.driver.list_container_objects(container=container)
self.assertEqual(len(objects), 2)
obj = [o for o in objects if o.name == "WEB控制台.odp"][0]
self.assertEqual(obj.hash, "281371EA1618CF0E645D6BB90A158276")
self.assertEqual(obj.size, 1234567)
self.assertEqual(obj.container.name, "test_container")
self.assertEqual(obj.extra["last_modified"], "2016-01-15T14:43:06.000Z")
self.assertTrue("owner" in obj.meta_data)
def test_list_container_objects_with_prefix(self):
self.mock_response_klass.type = "list_container_objects_prefix"
container = Container(name="test_container", extra={}, driver=self.driver)
self.prefix = "test_prefix"
objects = self.driver.list_container_objects(container=container, prefix=self.prefix)
self.assertEqual(len(objects), 2)
def test_get_container_doesnt_exist(self):
self.mock_response_klass.type = "get_container"
self.assertRaises(
ContainerDoesNotExistError,
self.driver.get_container,
container_name="not-existed",
)
def test_get_container_success(self):
self.mock_response_klass.type = "get_container"
container = self.driver.get_container(container_name="xz02tphky6fjfiuc0")
self.assertTrue(container.name, "xz02tphky6fjfiuc0")
def test_get_object_container_doesnt_exist(self):
self.mock_response_klass.type = "get_object"
self.assertRaises(
ObjectDoesNotExistError,
self.driver.get_object,
container_name="xz02tphky6fjfiuc0",
object_name="notexisted",
)
def test_get_object_success(self):
self.mock_response_klass.type = "get_object"
obj = self.driver.get_object(container_name="xz02tphky6fjfiuc0", object_name="test")
self.assertEqual(obj.name, "test")
self.assertEqual(obj.container.name, "xz02tphky6fjfiuc0")
self.assertEqual(obj.size, 0)
self.assertEqual(obj.hash, "D41D8CD98F00B204E9800998ECF8427E")
self.assertEqual(obj.extra["last_modified"], "Fri, 15 Jan 2016 14:43:15 GMT")
self.assertEqual(obj.extra["content_type"], "application/octet-stream")
self.assertEqual(obj.meta_data["rabbits"], "monkeys")
def test_create_container_bad_request(self):
# invalid container name, returns a 400 bad request
self.mock_response_klass.type = "invalid_name"
self.assertRaises(
ContainerError, self.driver.create_container, container_name="invalid_name"
)
def test_create_container_already_exists(self):
# container with this name already exists
self.mock_response_klass.type = "already_exists"
self.assertRaises(
InvalidContainerNameError,
self.driver.create_container,
container_name="new-container",
)
def test_create_container_success(self):
# success
self.mock_response_klass.type = "create_container"
name = "new_container"
container = self.driver.create_container(container_name=name)
self.assertEqual(container.name, name)
def test_create_container_with_ex_location(self):
self.mock_response_klass.type = "create_container_location"
name = "new_container"
self.ex_location = "oss-cn-beijing"
container = self.driver.create_container(container_name=name, ex_location=self.ex_location)
self.assertEqual(container.name, name)
self.assertTrue(container.extra["location"], self.ex_location)
def test_delete_container_doesnt_exist(self):
container = Container(name="new_container", extra=None, driver=self.driver)
self.mock_response_klass.type = "delete_container_doesnt_exist"
self.assertRaises(
ContainerDoesNotExistError,
self.driver.delete_container,
container=container,
)
def test_delete_container_not_empty(self):
container = Container(name="new_container", extra=None, driver=self.driver)
self.mock_response_klass.type = "delete_container_not_empty"
self.assertRaises(
ContainerIsNotEmptyError, self.driver.delete_container, container=container
)
def test_delete_container_success(self):
self.mock_response_klass.type = "delete_container"
container = Container(name="new_container", extra=None, driver=self.driver)
self.assertTrue(self.driver.delete_container(container=container))
def test_download_object_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = os.path.abspath(__file__) + ".temp"
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertTrue(result)
def test_download_object_invalid_file_size(self):
self.mock_response_klass.type = "invalid_size"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = os.path.abspath(__file__) + ".temp"
result = self.driver.download_object(
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
self.assertFalse(result)
def test_download_object_not_found(self):
self.mock_response_klass.type = "not_found"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
destination_path = os.path.abspath(__file__) + ".temp"
self.assertRaises(
ObjectDoesNotExistError,
self.driver.download_object,
obj=obj,
destination_path=destination_path,
overwrite_existing=False,
delete_on_failure=True,
)
def test_download_object_as_stream_success(self):
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1000,
hash=None,
extra={},
container=container,
meta_data=None,
driver=self.driver_type,
)
stream = self.driver.download_object_as_stream(obj=obj, chunk_size=None)
self.assertTrue(hasattr(stream, "__iter__"))
def test_upload_object_invalid_hash1(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(200, headers={"etag": "2345"}),
"bytes_transferred": 1000,
"data_hash": "hash343hhash89h932439jsaa89",
}
self.mock_response_klass.type = "INVALID_HASH1"
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
try:
self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
verify_hash=True,
)
except ObjectHashMismatchError:
pass
else:
self.fail("Invalid hash was returned but an exception was not thrown")
finally:
self.driver_type._upload_object = old_func
def test_upload_object_success(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
200, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
self.mock_response_klass.type = None
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
extra = {"meta_data": {"some-value": "foobar"}}
obj = self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True,
)
self.assertEqual(obj.name, "foo_test_upload")
self.assertEqual(obj.size, 1000)
self.assertTrue("some-value" in obj.meta_data)
self.driver_type._upload_object = old_func
def test_upload_object_with_acl(self):
def upload_file(
self,
object_name=None,
content_type=None,
request_path=None,
request_method=None,
headers=None,
file_path=None,
stream=None,
):
return {
"response": make_response(
200, headers={"etag": "0cc175b9c0f1b6a831c399e269772661"}
),
"bytes_transferred": 1000,
"data_hash": "0cc175b9c0f1b6a831c399e269772661",
}
self.mock_response_klass.type = None
old_func = self.driver_type._upload_object
self.driver_type._upload_object = upload_file
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
extra = {"acl": "public-read"}
obj = self.driver.upload_object(
file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True,
)
self.assertEqual(obj.name, "foo_test_upload")
self.assertEqual(obj.size, 1000)
self.assertEqual(obj.extra["acl"], "public-read")
self.driver_type._upload_object = old_func
def test_upload_object_with_invalid_acl(self):
file_path = os.path.abspath(__file__)
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_upload"
extra = {"acl": "invalid-acl"}
self.assertRaises(
AttributeError,
self.driver.upload_object,
file_path=file_path,
container=container,
object_name=object_name,
extra=extra,
verify_hash=True,
)
def test_upload_empty_object_via_stream(self):
if self.driver.supports_multipart_upload:
self.mock_response_klass.type = "multipart"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = DummyIterator(data=[""])
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 0)
def test_upload_small_object_via_stream(self):
if self.driver.supports_multipart_upload:
self.mock_response_klass.type = "multipart"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = DummyIterator(data=["2", "3", "5"])
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, 3)
def test_upload_big_object_via_stream(self):
if self.driver.supports_multipart_upload:
self.mock_response_klass.type = "multipart"
else:
self.mock_response_klass.type = None
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = DummyIterator(data=["2" * CHUNK_SIZE, "3" * CHUNK_SIZE, "5"])
extra = {"content_type": "text/plain"}
obj = self.driver.upload_object_via_stream(
container=container, object_name=object_name, iterator=iterator, extra=extra
)
self.assertEqual(obj.name, object_name)
self.assertEqual(obj.size, CHUNK_SIZE * 2 + 1)
def test_upload_object_via_stream_abort(self):
if not self.driver.supports_multipart_upload:
return
self.mock_response_klass.type = "MULTIPART"
def _faulty_iterator():
for i in range(0, 5):
yield str(i)
raise RuntimeError("Error in fetching data")
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
object_name = "foo_test_stream_data"
iterator = _faulty_iterator()
extra = {"content_type": "text/plain"}
try:
self.driver.upload_object_via_stream(
container=container,
object_name=object_name,
iterator=iterator,
extra=extra,
)
except Exception:
pass
return
def test_ex_iterate_multipart_uploads(self):
if not self.driver.supports_multipart_upload:
return
self.mock_response_klass.type = "list_multipart"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
for upload in self.driver.ex_iterate_multipart_uploads(container, max_uploads=2):
self.assertTrue(upload.key is not None)
self.assertTrue(upload.id is not None)
self.assertTrue(upload.initiated is not None)
def test_ex_abort_all_multipart_uploads(self):
if not self.driver.supports_multipart_upload:
return
self.mock_response_klass.type = "list_multipart"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
with mock.patch(
"libcloud.storage.drivers.oss.OSSStorageDriver" "._abort_multipart",
autospec=True,
) as mock_abort:
self.driver.ex_abort_all_multipart_uploads(container)
self.assertEqual(3, mock_abort.call_count)
def test_delete_object_not_found(self):
self.mock_response_klass.type = "not_found"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1234,
hash=None,
extra=None,
meta_data=None,
container=container,
driver=self.driver,
)
self.assertRaises(ObjectDoesNotExistError, self.driver.delete_object, obj=obj)
def test_delete_object_success(self):
self.mock_response_klass.type = "delete"
container = Container(name="foo_bar_container", extra={}, driver=self.driver)
obj = Object(
name="foo_bar_object",
size=1234,
hash=None,
extra=None,
meta_data=None,
container=container,
driver=self.driver,
)
result = self.driver.delete_object(obj=obj)
self.assertTrue(result)
if __name__ == "__main__":
sys.exit(unittest.main())