| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import re |
| import sys |
| import copy |
| import json |
| import unittest |
| import email.utils |
| from io import BytesIO |
| from unittest import mock |
| from unittest.mock import Mock, PropertyMock |
| |
| import pytest |
| |
| from libcloud.test import StorageMockHttp |
| from libcloud.utils.py3 import StringIO, httplib |
| from libcloud.common.types import InvalidCredsError |
| from libcloud.storage.base import Object, Container |
| from libcloud.test.secrets import STORAGE_GOOGLE_STORAGE_PARAMS |
| from libcloud.common.google import GoogleAuthType |
| from libcloud.storage.drivers import google_storage |
| from libcloud.test.file_fixtures import StorageFileFixtures |
| from libcloud.test.storage.test_s3 import S3Tests, S3MockHttp |
| from libcloud.test.common.test_google import GoogleTestCase |
| |
| CONN_CLS = google_storage.GoogleStorageConnection |
| JSON_CONN_CLS = google_storage.GoogleStorageJSONConnection |
| |
| TODAY = email.utils.formatdate(usegmt=True) |
| |
| |
| def _error_helper(code, headers): |
| message = httplib.responses[code] |
| body = { |
| "error": {"errors": [{"code": code, "message": message, "reason": message}]}, |
| } |
| return code, json.dumps(body), headers, httplib.responses[code] |
| |
| |
| class GoogleStorageMockHttp(S3MockHttp): |
| fixtures = StorageFileFixtures("google_storage") |
| |
| def _test2_test_get_object(self, method, url, body, headers): |
| # test_get_object |
| # Google uses a different HTTP header prefix for meta data |
| body = self.fixtures.load("list_containers.xml") |
| headers = { |
| "content-type": "application/zip", |
| "etag": '"e31208wqsdoj329jd"', |
| "x-goog-meta-rabbits": "monkeys", |
| "content-length": "12345", |
| "last-modified": "Thu, 13 Sep 2012 07:13:22 GMT", |
| } |
| |
| return httplib.OK, body, headers, httplib.responses[httplib.OK] |
| |
| def _test2_test_cont_length_get_object(self, method, url, body, headers): |
| # test_get_object_object_size_not_in_content_length_header |
| # Google uses a different HTTP header prefix for meta data |
| body = self.fixtures.load("list_containers.xml") |
| headers = { |
| "content-type": "application/zip", |
| "etag": '"e31208wqsdoj329jd"', |
| "x-goog-meta-rabbits": "monkeys", |
| "x-goog-stored-content-length": "9587", |
| "last-modified": "Thu, 13 Sep 2012 07:13:22 GMT", |
| } |
| |
| return httplib.OK, body, headers, httplib.responses[httplib.OK] |
| |
| def _container_path_UNAUTHORIZED(self, method, url, body, headers): |
| return ( |
| httplib.UNAUTHORIZED, |
| "", |
| self.base_headers, |
| httplib.responses[httplib.OK], |
| ) |
| |
| |
| class GoogleStorageJSONMockHttp(StorageMockHttp): |
| """ |
| Extracts bucket and object out of requests and routes to methods of the |
| forms (bucket, object, entity, and type are sanitized values |
| {'-', '.', '/' are replaced with '_'}): |
| |
| _<bucket>[_<type>] |
| _<bucket>_acl[_entity][_<type>] |
| _<bucket>_defaultObjectAcl[_<entity>][_<type>] |
| _<bucket>_<object>[_<type>] |
| _<bucket>_<object>_acl[_<entity>][_<type>] |
| |
| Ugly example: |
| /storage/v1/b/test-bucket/o/test-object/acl/test-entity |
| with type='FOO' yields |
| _test_bucket_test_object_acl_test_entity_FOO |
| """ |
| |
| fixtures = StorageFileFixtures("google_storage") |
| base_headers = {} |
| |
| # Path regex captures bucket, object, defaultObjectAcl, and acl values. |
| path_rgx = re.compile( |
| r"/storage/[^/]+/b/([^/]+)" |
| r"(?:/(defaultObjectAcl(?:/[^/]+)?$)|" |
| r"(?:/o/(.+?))?(?:/(acl(?:/[^/]+)?))?$)" |
| ) |
| |
| # Permissions to use when handling requests. |
| bucket_perms = google_storage.ContainerPermissions.NONE |
| object_perms = google_storage.ObjectPermissions.NONE |
| |
| _FORBIDDEN = _error_helper(httplib.FORBIDDEN, base_headers) |
| _NOT_FOUND = _error_helper(httplib.NOT_FOUND, base_headers) |
| _PRECONDITION_FAILED = _error_helper(httplib.PRECONDITION_FAILED, base_headers) |
| |
| def _get_method_name(self, type, use_param, qs, path): |
| match = self.path_rgx.match(path) |
| if not match: |
| raise ValueError("%s is not a valid path." % path) |
| |
| joined_groups = "_".join([g for g in match.groups() if g]) |
| if type: |
| meth_name = "_{}_{}".format(joined_groups, type) |
| else: |
| meth_name = "_%s" % joined_groups |
| # Return sanitized method name. |
| return meth_name.replace("/", "_").replace(".", "_").replace("-", "_") |
| |
| def _response_helper(self, fixture): |
| body = self.fixtures.load(fixture) |
| return httplib.OK, body, {}, httplib.responses[httplib.OK] |
| |
| #################### |
| # Request handlers # |
| #################### |
| def _test_bucket(self, method, url, body, headers): |
| """Bucket request.""" |
| if method != "GET": |
| raise NotImplementedError("%s is not implemented." % method) |
| |
| if self.bucket_perms < google_storage.ContainerPermissions.READER: |
| return self._FORBIDDEN |
| else: |
| return self._response_helper("get_container.json") |
| |
| def _test_bucket_acl(self, method, url, body, headers): |
| """Bucket list ACL request.""" |
| if method != "GET": |
| raise NotImplementedError("%s is not implemented." % method) |
| |
| if self.bucket_perms < google_storage.ContainerPermissions.OWNER: |
| return self._FORBIDDEN |
| else: |
| return self._response_helper("list_container_acl.json") |
| |
| def _test_bucket_test_object(self, method, url, body, headers): |
| """Object request.""" |
| if method != "GET": |
| raise NotImplementedError("%s is not implemented." % method) |
| |
| if self.object_perms < google_storage.ObjectPermissions.READER: |
| return self._FORBIDDEN |
| else: |
| return self._response_helper("get_object.json") |
| |
| def _test_bucket_test_object_acl(self, method, url, body, headers): |
| """Object list ACL request.""" |
| if method != "GET": |
| raise NotImplementedError("%s is not implemented." % method) |
| |
| if self.object_perms < google_storage.ObjectPermissions.OWNER: |
| return self._FORBIDDEN |
| else: |
| return self._response_helper("list_object_acl.json") |
| |
| def _test_bucket_writecheck(self, method, url, body, headers): |
| gen_match = headers.get("x-goog-if-generation-match") |
| if method != "DELETE" or gen_match != "0": |
| msg = "Improper write check delete strategy. method: %s, " "headers: %s" % ( |
| method, |
| headers, |
| ) |
| raise ValueError(msg) |
| |
| if self.bucket_perms < google_storage.ContainerPermissions.WRITER: |
| return self._FORBIDDEN |
| else: |
| return self._PRECONDITION_FAILED |
| |
| |
| class GoogleStorageConnectionTest(GoogleTestCase): |
| @mock.patch("email.utils.formatdate") |
| def test_add_default_headers(self, mock_formatdate): |
| mock_formatdate.return_value = TODAY |
| starting_headers = {"starting": "headers"} |
| project = "foo-project" |
| |
| # Modify headers when there is no project. |
| conn = CONN_CLS("foo_user", "bar_key", secure=True, auth_type=GoogleAuthType.GCS_S3) |
| conn.get_project = mock.Mock(return_value=None) |
| headers = dict(starting_headers) |
| headers["Date"] = TODAY |
| self.assertEqual(conn.add_default_headers(dict(starting_headers)), headers) |
| |
| # Modify headers when there is a project. |
| conn = CONN_CLS("foo_user", "bar_key", secure=True, auth_type=GoogleAuthType.GCS_S3) |
| conn.get_project = mock.Mock(return_value=project) |
| headers = dict(starting_headers) |
| headers["Date"] = TODAY |
| headers[CONN_CLS.PROJECT_ID_HEADER] = project |
| self.assertEqual(conn.add_default_headers(dict(starting_headers)), headers) |
| |
| @mock.patch("libcloud.storage.drivers.s3." "BaseS3Connection.get_auth_signature") |
| def test_get_s3_auth_signature(self, mock_s3_auth_sig_method): |
| # Check that the S3 HMAC signature method is used. |
| # Check that headers are copied and modified properly before calling |
| # the signature method. |
| mock_s3_auth_sig_method.return_value = "mock signature!" |
| starting_params = {} |
| starting_headers = { |
| "Date": TODAY, |
| "x-goog-foo": "X-GOOG: MAINTAIN UPPERCASE!", |
| "x-Goog-bar": "Header key should be lowered", |
| "Content-Type": "application/mIXED casING MAINTAINED", |
| "Other": "LOWER THIS!", |
| } |
| modified_headers = { |
| "date": TODAY, |
| "content-type": "application/mIXED casING MAINTAINED", |
| "x-goog-foo": "X-GOOG: MAINTAIN UPPERCASE!", |
| "x-goog-bar": "Header key should be lowered", |
| "other": "lower this!", |
| } |
| |
| conn = CONN_CLS("foo_user", "bar_key", secure=True, auth_type=GoogleAuthType.GCS_S3) |
| conn.method = "GET" |
| conn.action = "/path" |
| result = conn._get_s3_auth_signature(starting_params, starting_headers) |
| self.assertNotEqual(starting_headers, modified_headers) |
| self.assertEqual(result, "mock signature!") |
| mock_s3_auth_sig_method.assert_called_once_with( |
| method="GET", |
| headers=modified_headers, |
| params=starting_params, |
| expires=None, |
| secret_key="bar_key", |
| path="/path", |
| vendor_prefix="x-goog", |
| ) |
| |
| @mock.patch("libcloud.common.google.GoogleOAuth2Credential") |
| def test_pre_connect_hook_oauth2(self, mock_oauth2_credential_init): |
| # Check that we get the Authorization header from the OAuth2 token, |
| # not from the HMAC signature method. |
| # Check that the headers and pa |
| mock_oauth2_credential_init.return_value = mock.Mock() |
| starting_params = {"starting": "params"} |
| starting_headers = {"starting": "headers"} |
| |
| conn = CONN_CLS("foo_user", "bar_key", secure=True, auth_type=GoogleAuthType.GCE) |
| conn._get_s3_auth_signature = mock.Mock() |
| conn.oauth2_credential = mock.Mock() |
| conn.oauth2_credential.access_token = "Access_Token!" |
| expected_headers = dict(starting_headers) |
| expected_headers["Authorization"] = "Bearer Access_Token!" |
| result = conn.pre_connect_hook(dict(starting_params), dict(starting_headers)) |
| self.assertEqual(result, (starting_params, expected_headers)) |
| |
| def test_pre_connect_hook_hmac(self): |
| # Check that we call for a HMAC signature, passing params and headers |
| # Check that we properly apply the HMAC signature. |
| # Check that we don't use OAuth2 credentials. |
| starting_params = {"starting": "params"} |
| starting_headers = {"starting": "headers"} |
| |
| def fake_hmac_method(params, headers): |
| # snapshot the params and headers passed (they are modified later) |
| fake_hmac_method.params_passed = copy.deepcopy(params) |
| fake_hmac_method.headers_passed = copy.deepcopy(headers) |
| return "fake signature!" |
| |
| conn = CONN_CLS("foo_user", "bar_key", secure=True, auth_type=GoogleAuthType.GCS_S3) |
| conn._get_s3_auth_signature = fake_hmac_method |
| conn.action = "GET" |
| conn.method = "/foo" |
| expected_headers = dict(starting_headers) |
| expected_headers["Authorization"] = "{} {}:{}".format( |
| google_storage.SIGNATURE_IDENTIFIER, |
| "foo_user", |
| "fake signature!", |
| ) |
| result = conn.pre_connect_hook(dict(starting_params), dict(starting_headers)) |
| self.assertEqual(result, (dict(starting_params), expected_headers)) |
| self.assertEqual(fake_hmac_method.params_passed, starting_params) |
| self.assertEqual(fake_hmac_method.headers_passed, starting_headers) |
| self.assertIsNone(conn.oauth2_credential) |
| |
| |
| class GoogleStorageTests(S3Tests, GoogleTestCase): |
| driver_type = google_storage.GoogleStorageDriver |
| driver_args = STORAGE_GOOGLE_STORAGE_PARAMS |
| mock_response_klass = GoogleStorageMockHttp |
| |
| def setUp(self): |
| super().setUp() |
| self.driver_type.jsonConnectionCls.conn_class = GoogleStorageJSONMockHttp |
| |
| def tearDown(self): |
| self._remove_test_file() |
| |
| def test_billing_not_enabled(self): |
| # TODO |
| pass |
| |
| def test_token(self): |
| # Not supported on Google Storage |
| pass |
| |
| def test_get_object_object_size_in_content_length(self): |
| self.mock_response_klass.type = "get_object" |
| obj = self.driver.get_object(container_name="test2", object_name="test") |
| self.assertEqual(obj.size, 12345) |
| |
| def test_get_object_object_size_not_in_content_length_header(self): |
| self.mock_response_klass.type = "get_object" |
| obj = self.driver.get_object(container_name="test2", object_name="test_cont_length") |
| self.assertEqual(obj.size, 9587) |
| |
| def test_delete_permissions(self): |
| mock_request = mock.Mock() |
| self.driver.json_connection.request = mock_request |
| |
| # Test deleting object permissions. |
| self.driver.ex_delete_permissions("bucket", "object", entity="user-foo") |
| url = "/storage/v1/b/bucket/o/object/acl/user-foo" |
| mock_request.assert_called_once_with(url, method="DELETE") |
| |
| # Test deleting bucket permissions. |
| mock_request.reset_mock() |
| self.driver.ex_delete_permissions("bucket", entity="user-foo") |
| url = "/storage/v1/b/bucket/acl/user-foo" |
| mock_request.assert_called_once_with(url, method="DELETE") |
| |
| def test_delete_permissions_no_entity(self): |
| mock_request = mock.Mock() |
| mock_get_user = mock.Mock(return_value=None) |
| self.driver._get_user = mock_get_user |
| self.driver.json_connection.request = mock_request |
| |
| # Test deleting permissions on an object with no entity. |
| self.assertRaises(ValueError, self.driver.ex_delete_permissions, "bucket", "object") |
| |
| # Test deleting permissions on an bucket with no entity. |
| self.assertRaises(ValueError, self.driver.ex_delete_permissions, "bucket") |
| |
| mock_request.assert_not_called() |
| |
| # Test deleting permissions on an object with a default entity. |
| mock_get_user.return_value = "foo@foo.com" |
| self.driver.ex_delete_permissions("bucket", "object") |
| url = "/storage/v1/b/bucket/o/object/acl/user-foo@foo.com" |
| mock_request.assert_called_once_with(url, method="DELETE") |
| |
| # Test deleting permissions on an bucket with a default entity. |
| mock_request.reset_mock() |
| mock_get_user.return_value = "foo@foo.com" |
| self.driver.ex_delete_permissions("bucket") |
| url = "/storage/v1/b/bucket/acl/user-foo@foo.com" |
| mock_request.assert_called_once_with(url, method="DELETE") |
| |
| def test_get_permissions(self): |
| def test_permission_config(bucket_perms, object_perms): |
| GoogleStorageJSONMockHttp.bucket_perms = bucket_perms |
| GoogleStorageJSONMockHttp.object_perms = object_perms |
| |
| perms = self.driver.ex_get_permissions("test-bucket", "test-object") |
| self.assertEqual(perms, (bucket_perms, object_perms)) |
| |
| bucket_levels = range(len(google_storage.ContainerPermissions.values)) |
| object_levels = range(len(google_storage.ObjectPermissions.values)) |
| for bucket_perms in bucket_levels: |
| for object_perms in object_levels: |
| test_permission_config(bucket_perms, object_perms) |
| |
| def test_set_permissions(self): |
| mock_request = mock.Mock() |
| self.driver.json_connection.request = mock_request |
| |
| # Test setting object permissions. |
| self.driver.ex_set_permissions("bucket", "object", entity="user-foo", role="OWNER") |
| url = "/storage/v1/b/bucket/o/object/acl" |
| mock_request.assert_called_once_with( |
| url, method="POST", data=json.dumps({"role": "OWNER", "entity": "user-foo"}) |
| ) |
| |
| # Test setting object permissions with an ObjectPermissions value. |
| mock_request.reset_mock() |
| self.driver.ex_set_permissions( |
| "bucket", |
| "object", |
| entity="user-foo", |
| role=google_storage.ObjectPermissions.OWNER, |
| ) |
| url = "/storage/v1/b/bucket/o/object/acl" |
| mock_request.assert_called_once_with( |
| url, method="POST", data=json.dumps({"role": "OWNER", "entity": "user-foo"}) |
| ) |
| |
| # Test setting bucket permissions. |
| mock_request.reset_mock() |
| self.driver.ex_set_permissions("bucket", entity="user-foo", role="OWNER") |
| url = "/storage/v1/b/bucket/acl" |
| mock_request.assert_called_once_with( |
| url, method="POST", data=json.dumps({"role": "OWNER", "entity": "user-foo"}) |
| ) |
| |
| # Test setting bucket permissions with a ContainerPermissions value. |
| mock_request.reset_mock() |
| self.driver.ex_set_permissions( |
| "bucket", entity="user-foo", role=google_storage.ContainerPermissions.OWNER |
| ) |
| url = "/storage/v1/b/bucket/acl" |
| mock_request.assert_called_once_with( |
| url, method="POST", data=json.dumps({"role": "OWNER", "entity": "user-foo"}) |
| ) |
| |
| def test_set_permissions_bad_roles(self): |
| mock_request = mock.Mock() |
| self.driver.json_connection.request = mock_request |
| |
| # Test forgetting a role. |
| self.assertRaises(ValueError, self.driver.ex_set_permissions, "bucket", "object") |
| self.assertRaises(ValueError, self.driver.ex_set_permissions, "bucket") |
| mock_request.assert_not_called() |
| |
| # Test container permissions on an object. |
| self.assertRaises( |
| ValueError, |
| self.driver.ex_set_permissions, |
| "bucket", |
| "object", |
| role=google_storage.ContainerPermissions.OWNER, |
| ) |
| mock_request.assert_not_called() |
| |
| # Test object permissions on a container. |
| self.assertRaises( |
| ValueError, |
| self.driver.ex_set_permissions, |
| "bucket", |
| role=google_storage.ObjectPermissions.OWNER, |
| ) |
| mock_request.assert_not_called() |
| |
| def test_set_permissions_no_entity(self): |
| mock_request = mock.Mock() |
| mock_get_user = mock.Mock(return_value=None) |
| self.driver._get_user = mock_get_user |
| self.driver.json_connection.request = mock_request |
| |
| # Test for setting object permissions with no entity. |
| self.assertRaises( |
| ValueError, self.driver.ex_set_permissions, "bucket", "object", role="OWNER" |
| ) |
| |
| # Test for setting bucket permissions with no entity. |
| self.assertRaises(ValueError, self.driver.ex_set_permissions, "bucket", role="OWNER") |
| |
| mock_request.assert_not_called() |
| |
| # Test for setting object permissions with a default entity. |
| mock_get_user.return_value = "foo@foo.com" |
| self.driver.ex_set_permissions("bucket", "object", role="OWNER") |
| url = "/storage/v1/b/bucket/o/object/acl" |
| mock_request.assert_called_once_with( |
| url, |
| method="POST", |
| data=json.dumps({"role": "OWNER", "entity": "user-foo@foo.com"}), |
| ) |
| |
| # Test for setting bucket permissions with a default entity. |
| mock_request.reset_mock() |
| mock_get_user.return_value = "foo@foo.com" |
| self.driver.ex_set_permissions("bucket", role="OWNER") |
| url = "/storage/v1/b/bucket/acl" |
| mock_request.assert_called_once_with( |
| url, |
| method="POST", |
| data=json.dumps({"role": "OWNER", "entity": "user-foo@foo.com"}), |
| ) |
| |
| def test_invalid_credentials_on_upload(self): |
| self.mock_response_klass.type = "UNAUTHORIZED" |
| container = Container(name="container", driver=self.driver, extra={}) |
| with pytest.raises(InvalidCredsError): |
| self.driver.upload_object_via_stream(BytesIO(b" "), container, "path") |
| |
| def test_download_object_data_is_not_buffered_in_memory(self): |
| # Test case which verifies that response.body attribute is not accessed |
| # and as such, whole body response is not buffered into RAM |
| |
| # If content is consumed and response.content attribute accessed execption |
| # will be thrown and test will fail |
| |
| mock_response = Mock(name="mock response") |
| mock_response.headers = {} |
| mock_response.status_code = 200 |
| msg = '"content" attribute was accessed but it shouldn\'t have been' |
| type(mock_response).content = PropertyMock( |
| name="mock content attribute", side_effect=Exception(msg) |
| ) |
| mock_response.iter_content.return_value = StringIO("a" * 1000) |
| |
| self.driver.connection.connection.getresponse = Mock() |
| self.driver.connection.connection.getresponse.return_value = mock_response |
| |
| container = Container(name="foo_bar_container", extra={}, driver=self.driver) |
| obj = Object( |
| name="foo_bar_object_NO_BUFFER", |
| size=1000, |
| hash=None, |
| extra={}, |
| container=container, |
| meta_data=None, |
| driver=self.driver_type, |
| ) |
| destination_path = self._file_path |
| result = self.driver.download_object( |
| obj=obj, |
| destination_path=destination_path, |
| overwrite_existing=True, |
| delete_on_failure=True, |
| ) |
| self.assertTrue(result) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(unittest.main()) |