# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import os
import sys
import json
import tempfile
from io import BytesIO

from libcloud.test import generate_random_data  # pylint: disable-msg=E0611
from libcloud.test import unittest
from libcloud.utils.py3 import b, httplib, parse_qs, urlparse, basestring
from libcloud.common.types import LibcloudError, InvalidCredsError
from libcloud.storage.base import Object, Container
from libcloud.test.secrets import STORAGE_AZURE_BLOBS_PARAMS, STORAGE_AZURITE_BLOBS_PARAMS
from libcloud.storage.types import (
    ObjectDoesNotExistError,
    ObjectHashMismatchError,
    ContainerIsNotEmptyError,
    InvalidContainerNameError,
    ContainerDoesNotExistError,
    ContainerAlreadyExistsError,
)
from libcloud.test.storage.base import BaseRangeDownloadMockHttp
from libcloud.test.file_fixtures import StorageFileFixtures  # pylint: disable-msg=E0611
from libcloud.storage.drivers.azure_blobs import (
    AZURE_UPLOAD_CHUNK_SIZE,
    AzureBlobsStorageDriver,
    AzureBlobsActiveDirectoryConnection,
)


class AzureBlobsMockHttp(BaseRangeDownloadMockHttp, unittest.TestCase):

    fixtures = StorageFileFixtures("azure_blobs")
    base_headers = {}

    # Note: using this method to get the oauth key for azure ad authentication
    def __getattr__(self, n):
        def fn(method, url, body, headers):
            fixture = self.fixtures.load(n + ".json")

            if method in ("POST", "PUT"):
                try:
                    body = json.loads(body)
                    fixture_tmp = json.loads(fixture)
                    fixture_tmp = self._update(fixture_tmp, body)
                    fixture = json.dumps(fixture_tmp)
                except ValueError:
                    pass

            return (httplib.OK, fixture, headers, httplib.responses[httplib.OK])

        return fn

    def _UNAUTHORIZED(self, method, url, body, headers):
        return (
            httplib.UNAUTHORIZED,
            "",
            self.base_headers,
            httplib.responses[httplib.UNAUTHORIZED],
        )

    def _list_containers_EMPTY(self, method, url, body, headers):
        body = self.fixtures.load("list_containers_empty.xml")
        return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])

    def _list_containers(self, method, url, body, headers):
        query_string = urlparse.urlsplit(url).query
        query = parse_qs(query_string)

        if "marker" not in query:
            body = self.fixtures.load("list_containers_1.xml")
        else:
            body = self.fixtures.load("list_containers_2.xml")

        return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])

    def _test_container_EMPTY(self, method, url, body, headers):
        if method == "DELETE":
            body = ""
            return (
                httplib.ACCEPTED,
                body,
                self.base_headers,
                httplib.responses[httplib.ACCEPTED],
            )

        else:
            body = self.fixtures.load("list_objects_empty.xml")
            return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])

    def _new__container_INVALID_NAME(self, method, url, body, headers):
        return (
            httplib.BAD_REQUEST,
            body,
            self.base_headers,
            httplib.responses[httplib.BAD_REQUEST],
        )

    def _test_container(self, method, url, body, headers):
        query_string = urlparse.urlsplit(url).query
        query = parse_qs(query_string)

        if "marker" not in query:
            body = self.fixtures.load("list_objects_1.xml")
        else:
            body = self.fixtures.load("list_objects_2.xml")

        return (httplib.OK, body, self.base_headers, httplib.responses[httplib.OK])

    def _test_container100(self, method, url, body, headers):
        body = ""

        if method != "HEAD":
            return (
                httplib.BAD_REQUEST,
                body,
                self.base_headers,
                httplib.responses[httplib.BAD_REQUEST],
            )

        return (
            httplib.NOT_FOUND,
            body,
            self.base_headers,
            httplib.responses[httplib.NOT_FOUND],
        )

    def _test_container200(self, method, url, body, headers):
        body = ""

        if method != "HEAD":
            return (
                httplib.BAD_REQUEST,
                body,
                self.base_headers,
                httplib.responses[httplib.BAD_REQUEST],
            )

        headers = {}

        headers["etag"] = "0x8CFB877BB56A6FB"
        headers["last-modified"] = "Fri, 04 Jan 2013 09:48:06 GMT"
        headers["x-ms-lease-status"] = "unlocked"
        headers["x-ms-lease-state"] = "available"
        headers["x-ms-meta-meta1"] = "value1"

        return (httplib.OK, body, headers, httplib.responses[httplib.OK])

    def _test_container200_test(self, method, url, body, headers):
        body = ""

        if method != "HEAD":
            return (
                httplib.BAD_REQUEST,
                body,
                self.base_headers,
                httplib.responses[httplib.BAD_REQUEST],
            )

        headers = {}

        headers["etag"] = "0x8CFB877BB56A6FB"
        headers["last-modified"] = "Fri, 04 Jan 2013 09:48:06 GMT"
        headers["content-length"] = "12345"
        headers["content-type"] = "application/zip"
        headers["x-ms-blob-type"] = "Block"
        headers["x-ms-lease-status"] = "unlocked"
        headers["x-ms-lease-state"] = "available"
        headers["x-ms-meta-rabbits"] = "monkeys"

        return (httplib.OK, body, headers, httplib.responses[httplib.OK])

    def _test2_test_list_containers(self, method, url, body, headers):
        # test_get_object
        body = self.fixtures.load("list_containers.xml")
        headers = {
            "content-type": "application/zip",
            "etag": '"e31208wqsdoj329jd"',
            "x-amz-meta-rabbits": "monkeys",
            "content-length": "12345",
            "last-modified": "Thu, 13 Sep 2012 07:13:22 GMT",
        }

        return (httplib.OK, body, headers, httplib.responses[httplib.OK])

    def _new_container_ALREADY_EXISTS(self, method, url, body, headers):
        # test_create_container
        return (httplib.CONFLICT, body, headers, httplib.responses[httplib.CONFLICT])

    def _new_container(self, method, url, body, headers):
        # test_create_container, test_delete_container

        headers = {}

        if method == "PUT":
            status = httplib.CREATED

            headers["etag"] = "0x8CFB877BB56A6FB"
            headers["last-modified"] = "Fri, 04 Jan 2013 09:48:06 GMT"
            headers["x-ms-lease-status"] = "unlocked"
            headers["x-ms-lease-state"] = "available"
            headers["x-ms-meta-meta1"] = "value1"

        elif method == "DELETE":
            status = httplib.NO_CONTENT

        return (status, body, headers, httplib.responses[status])

    def _new_container_DOESNT_EXIST(self, method, url, body, headers):
        # test_delete_container
        return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.NOT_FOUND])

    def _foo_bar_container_NOT_FOUND(self, method, url, body, headers):
        # test_delete_container_not_found
        return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.NOT_FOUND])

    def _foo_bar_container_foo_bar_object_NOT_FOUND(self, method, url, body, headers):
        # test_delete_object_not_found
        return (httplib.NOT_FOUND, body, headers, httplib.responses[httplib.NOT_FOUND])

    def _foo_bar_container_foo_bar_object_DELETE(self, method, url, body, headers):
        # test_delete_object
        return (httplib.ACCEPTED, body, headers, httplib.responses[httplib.ACCEPTED])

    def _foo_bar_container_foo_test_upload(self, method, url, body, headers):
        self._assert_content_length_header_is_string(headers=headers)

        query_string = urlparse.urlsplit(url).query
        query = parse_qs(query_string)
        comp = query.get("comp", [])

        headers = {}
        body = ""

        if "blocklist" in comp or not comp:
            headers["etag"] = '"0x8CFB877BB56A6FB"'
            headers["content-md5"] = "d4fe4c9829f7ca1cc89db7ad670d2bbd"
        elif "block" in comp:
            headers["content-md5"] = "lvcfx/bOJvndpRlrdKU1YQ=="
        else:
            raise NotImplementedError("Unknown request comp: {}".format(comp))

        return (httplib.CREATED, body, headers, httplib.responses[httplib.CREATED])

    def _foo_bar_container_foo_test_upload_block(self, method, url, body, headers):
        # test_upload_object_success
        self._assert_content_length_header_is_string(headers=headers)

        body = ""
        headers = {}
        headers["etag"] = "0x8CFB877BB56A6FB"
        return (httplib.CREATED, body, headers, httplib.responses[httplib.CREATED])

    def _foo_bar_container_foo_test_upload_blocklist(self, method, url, body, headers):
        # test_upload_object_success
        self._assert_content_length_header_is_string(headers=headers)

        body = ""
        headers = {}
        headers["etag"] = "0x8CFB877BB56A6FB"
        headers["content-md5"] = "d4fe4c9829f7ca1cc89db7ad670d2bbd"

        return (httplib.CREATED, body, headers, httplib.responses[httplib.CREATED])

    def _foo_bar_container_foo_test_upload_lease(self, method, url, body, headers):
        # test_upload_object_success
        self._assert_content_length_header_is_string(headers=headers)

        action = headers["x-ms-lease-action"]
        rheaders = {"x-ms-lease-id": "someleaseid"}
        body = ""

        if action == "acquire":
            return (httplib.CREATED, body, rheaders, httplib.responses[httplib.CREATED])

        else:
            if headers.get("x-ms-lease-id", None) != "someleaseid":
                return (
                    httplib.BAD_REQUEST,
                    body,
                    rheaders,
                    httplib.responses[httplib.BAD_REQUEST],
                )

            return (httplib.OK, body, headers, httplib.responses[httplib.CREATED])

    def _foo_bar_container_foo_test_upload_INVALID_HASH(self, method, url, body, headers):
        # test_upload_object_invalid_hash1
        self._assert_content_length_header_is_string(headers=headers)

        body = ""
        headers = {}
        headers["etag"] = "0x8CFB877BB56A6FB"
        headers["content-md5"] = "d4fe4c9829f7ca1cc89db7ad670d2bbd"

        return (httplib.CREATED, body, headers, httplib.responses[httplib.CREATED])

    def _foo_bar_container_foo_bar_object(self, method, url, body, headers):
        # test_upload_object_invalid_file_size
        self._assert_content_length_header_is_string(headers=headers)

        body = generate_random_data(1000)
        return (httplib.OK, body, headers, httplib.responses[httplib.OK])

    def _foo_bar_container_foo_bar_object_range(self, method, url, body, headers):
        # test_download_object_range_success
        body = "0123456789123456789"

        self.assertTrue("x-ms-range" in headers)
        self.assertEqual(headers["x-ms-range"], "bytes=5-6")

        start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(
            headers["x-ms-range"], body
        )

        return (
            httplib.PARTIAL_CONTENT,
            body[start_bytes : end_bytes + 1],
            headers,
            httplib.responses[httplib.PARTIAL_CONTENT],
        )

    def _foo_bar_container_foo_bar_object_range_stream(self, method, url, body, headers):
        # test_download_object_range_as_stream_success
        body = "0123456789123456789"

        self.assertTrue("x-ms-range" in headers)
        self.assertEqual(headers["x-ms-range"], "bytes=4-5")

        start_bytes, end_bytes = self._get_start_and_end_bytes_from_range_str(
            headers["x-ms-range"], body
        )

        return (
            httplib.PARTIAL_CONTENT,
            body[start_bytes : end_bytes + 1],
            headers,
            httplib.responses[httplib.PARTIAL_CONTENT],
        )

    def _foo_bar_container_foo_bar_object_INVALID_SIZE(self, method, url, body, headers):
        # test_upload_object_invalid_file_size
        self._assert_content_length_header_is_string(headers=headers)

        body = ""
        return (httplib.OK, body, headers, httplib.responses[httplib.OK])

    def _assert_content_length_header_is_string(self, headers):
        if "Content-Length" in headers:
            self.assertTrue(isinstance(headers["Content-Length"], basestring))


class AzuriteBlobsMockHttp(AzureBlobsMockHttp):
    fixtures = StorageFileFixtures("azurite_blobs")

    def _get_method_name(self, *args, **kwargs):
        method_name = super()._get_method_name(*args, **kwargs)

        if method_name.startswith("_account"):
            method_name = method_name[8:]

        return method_name


class AzureBlobsTests(unittest.TestCase):
    driver_type = AzureBlobsStorageDriver
    driver_args = STORAGE_AZURE_BLOBS_PARAMS
    mock_response_klass = AzureBlobsMockHttp

    @classmethod
    def create_driver(self):
        return self.driver_type(*self.driver_args)

    def setUp(self):
        self.driver_type.connectionCls.conn_class = self.mock_response_klass
        self.mock_response_klass.type = None
        self.driver = self.create_driver()

    def tearDown(self):
        self._remove_test_file()

    def _remove_test_file(self):
        file_path = os.path.abspath(__file__) + ".temp"

        try:
            os.unlink(file_path)
        except OSError:
            pass

    def test_invalid_credentials(self):
        self.mock_response_klass.type = "UNAUTHORIZED"
        try:
            self.driver.list_containers()
        except InvalidCredsError as e:
            self.assertEqual(True, isinstance(e, InvalidCredsError))
        else:
            self.fail("Exception was not thrown")

    def test_list_containers_empty(self):
        self.mock_response_klass.type = "list_containers_EMPTY"
        containers = self.driver.list_containers()
        self.assertEqual(len(containers), 0)

    def test_list_containers_success(self):
        self.mock_response_klass.type = "list_containers"
        AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2
        containers = self.driver.list_containers()
        self.assertEqual(len(containers), 4)

        self.assertTrue("last_modified" in containers[1].extra)
        self.assertTrue("url" in containers[1].extra)
        self.assertTrue("etag" in containers[1].extra)
        self.assertTrue("lease" in containers[1].extra)
        self.assertTrue("meta_data" in containers[1].extra)
        self.assertEqual(containers[1].extra["etag"], "0x8CFBAB7B5B82D8E")

    def test_list_container_objects_empty(self):
        self.mock_response_klass.type = "EMPTY"
        container = Container(name="test_container", extra={}, driver=self.driver)
        objects = self.driver.list_container_objects(container=container)
        self.assertEqual(len(objects), 0)

    def test_list_container_objects_success(self):
        self.mock_response_klass.type = None
        AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2

        container = Container(name="test_container", extra={}, driver=self.driver)

        objects = self.driver.list_container_objects(container=container)
        self.assertEqual(len(objects), 4)

        obj = objects[1]
        self.assertEqual(obj.name, "object2.txt")
        self.assertEqual(obj.hash, "0x8CFB90F1BA8CD8F")
        self.assertEqual(obj.size, 1048576)
        self.assertEqual(obj.container.name, "test_container")
        self.assertTrue("meta1" in obj.meta_data)
        self.assertTrue("meta2" in obj.meta_data)
        self.assertTrue("last_modified" in obj.extra)
        self.assertTrue("content_type" in obj.extra)
        self.assertTrue("content_encoding" in obj.extra)
        self.assertTrue("content_language" in obj.extra)

    def test_list_container_objects_with_prefix(self):
        self.mock_response_klass.type = None
        AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2

        container = Container(name="test_container", extra={}, driver=self.driver)
        objects = self.driver.list_container_objects(container=container, prefix="test_prefix")
        self.assertEqual(len(objects), 4)

        obj = objects[1]
        self.assertEqual(obj.name, "object2.txt")
        self.assertEqual(obj.hash, "0x8CFB90F1BA8CD8F")
        self.assertEqual(obj.size, 1048576)
        self.assertEqual(obj.container.name, "test_container")
        self.assertTrue("meta1" in obj.meta_data)
        self.assertTrue("meta2" in obj.meta_data)
        self.assertTrue("last_modified" in obj.extra)
        self.assertTrue("content_type" in obj.extra)
        self.assertTrue("content_encoding" in obj.extra)
        self.assertTrue("content_language" in obj.extra)

    def test_get_container_doesnt_exist(self):
        self.mock_response_klass.type = None
        try:
            self.driver.get_container(container_name="test_container100")
        except ContainerDoesNotExistError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_get_container_success(self):
        self.mock_response_klass.type = None
        container = self.driver.get_container(container_name="test_container200")

        self.assertTrue(container.name, "test_container200")
        self.assertTrue(container.extra["etag"], "0x8CFB877BB56A6FB")
        self.assertTrue(container.extra["last_modified"], "Fri, 04 Jan 2013 09:48:06 GMT")
        self.assertTrue(container.extra["lease"]["status"], "unlocked")
        self.assertTrue(container.extra["lease"]["state"], "available")
        self.assertTrue(container.extra["meta_data"]["meta1"], "value1")

        if self.driver.secure:
            expected_url = "https://account.blob.core.windows.net/test_container200"
        else:
            expected_url = "http://localhost/account/test_container200"

        self.assertEqual(container.extra["url"], expected_url)

    def test_get_object_cdn_url(self):
        obj = self.driver.get_object(container_name="test_container200", object_name="test")

        url = urlparse.urlparse(self.driver.get_object_cdn_url(obj))
        query = urlparse.parse_qs(url.query)

        self.assertEqual(len(query["sig"]), 1)
        self.assertGreater(len(query["sig"][0]), 0)

    def test_get_object_container_doesnt_exist(self):
        # This method makes two requests which makes mocking the response a bit
        # trickier
        self.mock_response_klass.type = None
        try:
            self.driver.get_object(container_name="test_container100", object_name="test")
        except ContainerDoesNotExistError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_get_object_success(self):
        # This method makes two requests which makes mocking the response a bit
        # trickier
        self.mock_response_klass.type = None
        obj = self.driver.get_object(container_name="test_container200", object_name="test")

        self.assertEqual(obj.name, "test")
        self.assertEqual(obj.container.name, "test_container200")
        self.assertEqual(obj.size, 12345)
        self.assertEqual(obj.hash, "0x8CFB877BB56A6FB")
        self.assertEqual(obj.extra["last_modified"], "Fri, 04 Jan 2013 09:48:06 GMT")
        self.assertEqual(obj.extra["content_type"], "application/zip")
        self.assertEqual(obj.meta_data["rabbits"], "monkeys")

    def test_create_container_invalid_name(self):
        # invalid container name
        self.mock_response_klass.type = "INVALID_NAME"
        try:
            self.driver.create_container(container_name="new--container")
        except InvalidContainerNameError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_create_container_already_exists(self):
        # container with this name already exists
        self.mock_response_klass.type = "ALREADY_EXISTS"
        try:
            self.driver.create_container(container_name="new-container")
        except ContainerAlreadyExistsError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_create_container_success(self):
        # success
        self.mock_response_klass.type = None
        name = "new-container"
        container = self.driver.create_container(container_name=name)
        self.assertEqual(container.name, name)

    def test_delete_container_doesnt_exist(self):
        container = Container(name="new_container", extra=None, driver=self.driver)
        self.mock_response_klass.type = "DOESNT_EXIST"
        try:
            self.driver.delete_container(container=container)
        except ContainerDoesNotExistError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_delete_container_not_empty(self):
        self.mock_response_klass.type = None
        AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2

        container = Container(name="test_container", extra={}, driver=self.driver)

        try:
            self.driver.delete_container(container=container)
        except ContainerIsNotEmptyError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_delete_container_success(self):
        self.mock_response_klass.type = "EMPTY"
        AzureBlobsStorageDriver.RESPONSES_PER_REQUEST = 2

        container = Container(name="test_container", extra={}, driver=self.driver)

        self.assertTrue(self.driver.delete_container(container=container))

    def test_delete_container_not_found(self):
        self.mock_response_klass.type = "NOT_FOUND"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        try:
            self.driver.delete_container(container=container)
        except ContainerDoesNotExistError:
            pass
        else:
            self.fail("Container does not exist but an exception was not" + "thrown")

    def test_download_object_success(self):
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object",
            size=1000,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )
        destination_path = os.path.abspath(__file__) + ".temp"
        result = self.driver.download_object(
            obj=obj,
            destination_path=destination_path,
            overwrite_existing=False,
            delete_on_failure=True,
        )
        self.assertTrue(result)

    def test_download_object_invalid_file_size(self):
        self.mock_response_klass.type = "INVALID_SIZE"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object",
            size=1000,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )
        destination_path = os.path.abspath(__file__) + ".temp"
        result = self.driver.download_object(
            obj=obj,
            destination_path=destination_path,
            overwrite_existing=False,
            delete_on_failure=True,
        )
        self.assertFalse(result)

    def test_download_object_invalid_file_already_exists(self):
        self.mock_response_klass.type = "INVALID_SIZE"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object",
            size=1000,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )
        destination_path = os.path.abspath(__file__)
        try:
            self.driver.download_object(
                obj=obj,
                destination_path=destination_path,
                overwrite_existing=False,
                delete_on_failure=True,
            )
        except LibcloudError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_download_object_as_stream_success(self):
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)

        obj = Object(
            name="foo_bar_object",
            size=1000,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )

        stream = self.driver.download_object_as_stream(obj=obj, chunk_size=None)

        consumed_stream = "".join(chunk.decode("utf-8") for chunk in stream)
        self.assertEqual(len(consumed_stream), obj.size)

    def test_download_object_range_success(self):
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object_range",
            size=1000,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )
        destination_path = os.path.abspath(__file__) + ".temp"
        result = self.driver.download_object_range(
            obj=obj,
            start_bytes=5,
            end_bytes=7,
            destination_path=destination_path,
            overwrite_existing=False,
            delete_on_failure=True,
        )
        self.assertTrue(result)

        with open(destination_path) as fp:
            content = fp.read()

        self.assertEqual(content, "56")

    def test_download_object_range_as_stream_success(self):
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)

        obj = Object(
            name="foo_bar_object_range_stream",
            size=2,
            hash=None,
            extra={},
            container=container,
            meta_data=None,
            driver=self.driver_type,
        )

        stream = self.driver.download_object_range_as_stream(
            obj=obj, start_bytes=4, end_bytes=6, chunk_size=None
        )

        consumed_stream = "".join(chunk.decode("utf-8") for chunk in stream)
        self.assertEqual(consumed_stream, "45")
        self.assertEqual(len(consumed_stream), obj.size)

    def test_upload_object_invalid_md5(self):
        # Invalid md5 is returned by azure
        self.mock_response_klass.type = "INVALID_HASH"

        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        object_name = "foo_test_upload"
        file_path = os.path.abspath(__file__)
        try:
            self.driver.upload_object(
                file_path=file_path,
                container=container,
                object_name=object_name,
                verify_hash=True,
            )
        except ObjectHashMismatchError:
            pass
        else:
            self.fail("Invalid hash was returned but an exception was not thrown")

    def test_upload_small_block_object_success(self):
        file_path = os.path.abspath(__file__)
        file_size = os.stat(file_path).st_size

        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        object_name = "foo_test_upload"
        extra = {"meta_data": {"some-value": "foobar"}}
        obj = self.driver.upload_object(
            file_path=file_path,
            container=container,
            object_name=object_name,
            extra=extra,
            verify_hash=False,
        )

        self.assertEqual(obj.name, "foo_test_upload")
        self.assertEqual(obj.size, file_size)
        self.assertTrue("some-value" in obj.meta_data)

    def test_upload_big_block_object_success(self):
        _, file_path = tempfile.mkstemp(suffix=".jpg")
        file_size = AZURE_UPLOAD_CHUNK_SIZE + 1

        with open(file_path, "w") as file_hdl:
            file_hdl.write("0" * file_size)

        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        object_name = "foo_test_upload"
        extra = {"meta_data": {"some-value": "foobar"}}
        obj = self.driver.upload_object(
            file_path=file_path,
            container=container,
            object_name=object_name,
            extra=extra,
            verify_hash=False,
        )

        self.assertEqual(obj.name, "foo_test_upload")
        self.assertEqual(obj.size, file_size)
        self.assertTrue("some-value" in obj.meta_data)

        os.remove(file_path)

    def test_upload_small_block_object_success_with_lease(self):
        self.mock_response_klass.use_param = "comp"
        file_path = os.path.abspath(__file__)
        file_size = os.stat(file_path).st_size

        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        object_name = "foo_test_upload"
        extra = {"meta_data": {"some-value": "foobar"}}
        obj = self.driver.upload_object(
            file_path=file_path,
            container=container,
            object_name=object_name,
            extra=extra,
            verify_hash=False,
            ex_use_lease=True,
        )

        self.assertEqual(obj.name, "foo_test_upload")
        self.assertEqual(obj.size, file_size)
        self.assertTrue("some-value" in obj.meta_data)
        self.mock_response_klass.use_param = None

    def test_upload_big_block_object_success_with_lease(self):
        self.mock_response_klass.use_param = "comp"
        _, file_path = tempfile.mkstemp(suffix=".jpg")
        file_size = AZURE_UPLOAD_CHUNK_SIZE * 2

        with open(file_path, "w") as file_hdl:
            file_hdl.write("0" * file_size)

        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        object_name = "foo_test_upload"
        extra = {"meta_data": {"some-value": "foobar"}}
        obj = self.driver.upload_object(
            file_path=file_path,
            container=container,
            object_name=object_name,
            extra=extra,
            verify_hash=False,
            ex_use_lease=False,
        )

        self.assertEqual(obj.name, "foo_test_upload")
        self.assertEqual(obj.size, file_size)
        self.assertTrue("some-value" in obj.meta_data)

        os.remove(file_path)
        self.mock_response_klass.use_param = None

    def test_upload_blob_object_via_stream(self):
        self.mock_response_klass.use_param = "comp"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)

        object_name = "foo_test_upload"
        iterator = BytesIO(b("345"))
        extra = {"content_type": "text/plain"}
        obj = self.driver.upload_object_via_stream(
            container=container, object_name=object_name, iterator=iterator, extra=extra
        )

        self.assertEqual(obj.name, object_name)
        self.assertEqual(obj.size, 3)
        self.mock_response_klass.use_param = None

    def test_upload_blob_object_via_stream_from_iterable(self):
        self.mock_response_klass.use_param = "comp"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)

        object_name = "foo_test_upload"
        iterator = iter([b("34"), b("5")])
        extra = {"content_type": "text/plain"}
        obj = self.driver.upload_object_via_stream(
            container=container, object_name=object_name, iterator=iterator, extra=extra
        )

        self.assertEqual(obj.name, object_name)
        self.assertEqual(obj.size, 3)
        self.mock_response_klass.use_param = None

    def test_upload_blob_object_via_stream_with_lease(self):
        self.mock_response_klass.use_param = "comp"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)

        object_name = "foo_test_upload"
        iterator = BytesIO(b("345"))
        extra = {"content_type": "text/plain"}
        obj = self.driver.upload_object_via_stream(
            container=container,
            object_name=object_name,
            iterator=iterator,
            extra=extra,
            ex_use_lease=True,
        )

        self.assertEqual(obj.name, object_name)
        self.assertEqual(obj.size, 3)
        self.mock_response_klass.use_param = None

    def test_delete_object_not_found(self):
        self.mock_response_klass.type = "NOT_FOUND"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object",
            size=1234,
            hash=None,
            extra=None,
            meta_data=None,
            container=container,
            driver=self.driver,
        )
        try:
            self.driver.delete_object(obj=obj)
        except ObjectDoesNotExistError:
            pass
        else:
            self.fail("Exception was not thrown")

    def test_delete_object_success(self):
        self.mock_response_klass.type = "DELETE"
        container = Container(name="foo_bar_container", extra={}, driver=self.driver)
        obj = Object(
            name="foo_bar_object",
            size=1234,
            hash=None,
            extra=None,
            meta_data=None,
            container=container,
            driver=self.driver,
        )

        result = self.driver.delete_object(obj=obj)
        self.assertTrue(result)

    def test_storage_driver_host(self):
        # Non regression tests for issue LIBCLOUD-399 dealing with the bad
        # management of the connectionCls.host class attribute
        driver1 = self.driver_type("fakeaccount1", "deadbeafcafebabe==")
        driver2 = self.driver_type("fakeaccount2", "deadbeafcafebabe==")
        driver3 = self.driver_type("fakeaccount3", "deadbeafcafebabe==", host="test.foo.bar.com")

        host1 = driver1.connection.host
        host2 = driver2.connection.host
        host3 = driver3.connection.host

        self.assertEqual(host1, "fakeaccount1.blob.core.windows.net")
        self.assertEqual(host2, "fakeaccount2.blob.core.windows.net")
        self.assertEqual(host3, "test.foo.bar.com")

    def test_normalize_http_headers(self):
        driver = self.driver_type("fakeaccount1", "deadbeafcafebabe==")

        headers = driver._fix_headers(
            {
                # should be normalized to include x-ms-blob prefix
                "Content-Encoding": "gzip",
                "content-language": "en-us",
                # should be passed through
                "x-foo": "bar",
            }
        )

        self.assertEqual(
            headers,
            {
                "x-ms-blob-content-encoding": "gzip",
                "x-ms-blob-content-language": "en-us",
                "x-foo": "bar",
            },
        )

    def test_storage_driver_host_govcloud(self):
        driver1 = self.driver_type(
            "fakeaccount1", "deadbeafcafebabe==", host="blob.core.usgovcloudapi.net"
        )
        driver2 = self.driver_type(
            "fakeaccount2",
            "deadbeafcafebabe==",
            host="fakeaccount2.blob.core.usgovcloudapi.net",
        )

        host1 = driver1.connection.host
        host2 = driver2.connection.host
        account_prefix_1 = driver1.connection.account_prefix
        account_prefix_2 = driver2.connection.account_prefix

        self.assertEqual(host1, "fakeaccount1.blob.core.usgovcloudapi.net")
        self.assertEqual(host2, "fakeaccount2.blob.core.usgovcloudapi.net")
        self.assertIsNone(account_prefix_1)
        self.assertIsNone(account_prefix_2)

    def test_storage_driver_host_azurite(self):
        driver = self.driver_type(
            "fakeaccount1",
            "deadbeafcafebabe==",
            host="localhost",
            port=10000,
            secure=False,
        )

        host = driver.connection.host
        account_prefix = driver.connection.account_prefix

        self.assertEqual(host, "localhost")
        self.assertEqual(account_prefix, "fakeaccount1")

    def test_storage_driver_azure_ad(self):
        AzureBlobsActiveDirectoryConnection.conn_class = AzureBlobsMockHttp
        driver = self.driver_type(
            key="fakeaccount1",
            secret="DEKjfhdakkdjfhei~",
            tenant_id="77777777-7777-7777-7777-777777777777",
            identity="55555555-5555-5555-5555-555555555555",
            auth_type="azureAd",
            secure=True,
        )
        host = driver.connection.host

        self.assertEqual(host, "fakeaccount1.blob.core.windows.net")

    def test_get_azure_ad_object_success(self):
        AzureBlobsActiveDirectoryConnection.conn_class = AzureBlobsMockHttp
        driver = self.driver_type(
            key="fakeaccount1",
            secret="DEKjfhdakkdjfhei~",
            tenant_id="77777777-7777-7777-7777-777777777777",
            identity="55555555-5555-5555-5555-555555555555",
            auth_type="azureAd",
            secure=True,
        )
        self.mock_response_klass.type = None
        container = driver.get_container(container_name="test_container200")

        self.assertTrue(container.name, "test_container200")
        self.assertTrue(container.extra["etag"], "0x8CFB877BB56A6FB")
        self.assertTrue(container.extra["last_modified"], "Fri, 04 Jan 2013 09:48:06 GMT")
        self.assertTrue(container.extra["lease"]["status"], "unlocked")
        self.assertTrue(container.extra["lease"]["state"], "available")
        self.assertTrue(container.extra["meta_data"]["meta1"], "value1")


class AzuriteBlobsTests(AzureBlobsTests):
    driver_args = STORAGE_AZURITE_BLOBS_PARAMS
    mock_response_klass = AzuriteBlobsMockHttp


if __name__ == "__main__":
    sys.exit(unittest.main())
