| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from __future__ import with_statement |
| |
| import time |
| import base64 |
| import hmac |
| import re |
| import os |
| import binascii |
| |
| from hashlib import sha256 |
| from xml.etree.ElementTree import Element, SubElement |
| |
| from libcloud.utils.py3 import PY3 |
| from libcloud.utils.py3 import httplib |
| from libcloud.utils.py3 import urlquote |
| from libcloud.utils.py3 import tostring |
| from libcloud.utils.py3 import b |
| |
| from libcloud.utils.xml import fixxpath, findtext |
| from libcloud.utils.files import read_in_chunks |
| from libcloud.common.types import LibcloudError |
| from libcloud.common.azure import AzureConnection |
| |
| from libcloud.storage.base import Object, Container, StorageDriver |
| from libcloud.storage.types import ContainerIsNotEmptyError |
| from libcloud.storage.types import ContainerAlreadyExistsError |
| from libcloud.storage.types import InvalidContainerNameError |
| from libcloud.storage.types import ContainerDoesNotExistError |
| from libcloud.storage.types import ObjectDoesNotExistError |
| from libcloud.storage.types import ObjectHashMismatchError |
| |
| if PY3: |
| from io import FileIO as file |
| |
| # Desired number of items in each response inside a paginated request |
| RESPONSES_PER_REQUEST = 100 |
| |
| # As per the Azure documentation, if the upload file size is less than |
| # 64MB, we can upload it in a single request. However, in real life azure |
| # servers seem to disconnect randomly after around 5 MB or 200s of upload. |
| # So, it is better that for file sizes greater than 4MB, we upload it in |
| # chunks. |
| # Also, with large sizes, if we use a lease, the lease will timeout after |
| # 60 seconds, but the upload might still be in progress. This can be |
| # handled in code, but if we use chunked uploads, the lease renewal will |
| # happen automatically. |
| AZURE_BLOCK_MAX_SIZE = 4 * 1024 * 1024 |
| |
| # Azure block blocks must be maximum 4MB |
| # Azure page blobs must be aligned in 512 byte boundaries (4MB fits that) |
| AZURE_CHUNK_SIZE = 4 * 1024 * 1024 |
| |
| # Azure page blob must be aligned in 512 byte boundaries |
| AZURE_PAGE_CHUNK_SIZE = 512 |
| |
| # The time period (in seconds) for which a lease must be obtained. |
| # If set as -1, we get an infinite lease, but that is a bad idea. If |
| # after getting an infinite lease, there was an issue in releasing the |
| # lease, the object will remain 'locked' forever, unless the lease is |
| # released using the lease_id (which is not exposed to the user) |
| AZURE_LEASE_PERIOD = 60 |
| |
| |
| class AzureBlobLease(object): |
| """ |
| A class to help in leasing an azure blob and renewing the lease |
| """ |
| def __init__(self, driver, object_path, use_lease): |
| """ |
| @param driver: The Azure storage driver that is being used |
| @type driver: L{AzureStorageDriver} |
| |
| @param object_path: The path of the object we need to lease |
| @type object_path: C{str} |
| |
| @param use_lease: Indicates if we must take a lease or not |
| @type use_lease: C{bool} |
| """ |
| self.object_path = object_path |
| self.driver = driver |
| self.use_lease = use_lease |
| self.lease_id = None |
| self.params = {'comp': 'lease'} |
| |
| def renew(self): |
| """ |
| Renew the lease if it is older than a predefined time period |
| """ |
| if self.lease_id is None: |
| return |
| |
| headers = {'x-ms-lease-action': 'renew', |
| 'x-ms-lease-id': self.lease_id, |
| 'x-ms-lease-duration': '60'} |
| |
| response = self.driver.connection.request(self.object_path, |
| headers=headers, |
| params=self.params, |
| method='PUT') |
| |
| if response.status != httplib.OK: |
| raise LibcloudError('Unable to obtain lease', driver=self) |
| |
| def update_headers(self, headers): |
| """ |
| Update the lease id in the headers |
| """ |
| if self.lease_id: |
| headers['x-ms-lease-id'] = self.lease_id |
| |
| def __enter__(self): |
| if not self.use_lease: |
| return self |
| |
| headers = {'x-ms-lease-action': 'acquire', |
| 'x-ms-lease-duration': '60'} |
| |
| response = self.driver.connection.request(self.object_path, |
| headers=headers, |
| params=self.params, |
| method='PUT') |
| |
| if response.status == httplib.NOT_FOUND: |
| return self |
| elif response.status != httplib.CREATED: |
| raise LibcloudError('Unable to obtain lease', driver=self) |
| |
| self.lease_id = response.headers['x-ms-lease-id'] |
| return self |
| |
| def __exit__(self, type, value, traceback): |
| if self.lease_id is None: |
| return |
| |
| headers = {'x-ms-lease-action': 'release', |
| 'x-ms-lease-id': self.lease_id} |
| response = self.driver.connection.request(self.object_path, |
| headers=headers, |
| params=self.params, |
| method='PUT') |
| |
| if response.status != httplib.OK: |
| raise LibcloudError('Unable to release lease', driver=self) |
| |
| |
| class AzureBlobsConnection(AzureConnection): |
| """ |
| Represents a single connection to Azure Blobs |
| """ |
| host = 'blob.core.windows.net' |
| |
| |
| class AzureBlobsStorageDriver(StorageDriver): |
| name = 'Microsoft Azure (blobs)' |
| website = 'http://windows.azure.com/' |
| connectionCls = AzureBlobsConnection |
| hash_type = 'md5' |
| supports_chunked_encoding = False |
| ex_blob_type = 'BlockBlob' |
| |
| def __init__(self, key, secret=None, secure=True, host=None, port=None, |
| **kwargs): |
| |
| # The hostname must be 'account.blobs.core.windows.net' |
| self.connectionCls.host = '%s.%s' % (key, self.connectionCls.host) |
| |
| # B64decode() this key and keep it, so that we don't have to do |
| # so for every request. Minor performance improvement |
| secret = base64.b64decode(b(secret)) |
| |
| super(AzureBlobsStorageDriver, self).__init__( |
| key=key, secret=secret, |
| secure=secure, host=host, |
| port=port, **kwargs) |
| |
| def _xml_to_container(self, node): |
| """ |
| Converts a container XML node to a container instance |
| |
| @param node: XML info of the container |
| @type node: L{xml.etree.ElementTree.Element} |
| |
| @return: A container instance |
| @rtype: L{Container} |
| """ |
| |
| name = node.findtext(fixxpath(xpath='Name')) |
| props = node.find(fixxpath(xpath='Properties')) |
| metadata = node.find(fixxpath(xpath='Metadata')) |
| |
| extra = { |
| 'url': node.findtext(fixxpath(xpath='Url')), |
| 'last_modified': node.findtext(fixxpath(xpath='Last-Modified')), |
| 'etag': props.findtext(fixxpath(xpath='Etag')), |
| 'lease': { |
| 'status': props.findtext(fixxpath(xpath='LeaseStatus')), |
| 'state': props.findtext(fixxpath(xpath='LeaseState')), |
| 'duration': props.findtext(fixxpath(xpath='LeaseDuration')), |
| }, |
| 'meta_data': {} |
| } |
| |
| for meta in metadata.getchildren(): |
| extra['meta_data'][meta.tag] = meta.text |
| |
| return Container(name=name, extra=extra, driver=self) |
| |
| def _response_to_container(self, container_name, response): |
| """ |
| Converts a HTTP response to a container instance |
| |
| @param container_name: Name of the container |
| @type container_name: C{str} |
| |
| @param response: HTTP Response |
| @type node: L{} |
| |
| @return: A container instance |
| @rtype: L{Container} |
| """ |
| |
| headers = response.headers |
| extra = { |
| 'url': 'http://%s%s' % (response.connection.host, |
| response.connection.action), |
| 'etag': headers['etag'], |
| 'last_modified': headers['last-modified'], |
| 'lease': { |
| 'status': headers.get('x-ms-lease-status', None), |
| 'state': headers.get('x-ms-lease-state', None), |
| 'duration': headers.get('x-ms-lease-duration', None), |
| }, |
| 'meta_data': {} |
| } |
| |
| for key, value in response.headers.items(): |
| if key.startswith('x-ms-meta-'): |
| key = key.split('x-ms-meta-')[1] |
| extra['meta_data'][key] = value |
| |
| return Container(name=container_name, extra=extra, driver=self) |
| |
| def _xml_to_object(self, container, blob): |
| """ |
| Converts a BLOB XML node to an object instance |
| |
| @param container: Instance of the container holding the blob |
| @type: L{Container} |
| |
| @param blob: XML info of the blob |
| @type blob: L{} |
| |
| @return: An object instance |
| @rtype: L{Object} |
| """ |
| |
| name = blob.findtext(fixxpath(xpath='Name')) |
| props = blob.find(fixxpath(xpath='Properties')) |
| metadata = blob.find(fixxpath(xpath='Metadata')) |
| etag = props.findtext(fixxpath(xpath='Etag')) |
| size = int(props.findtext(fixxpath(xpath='Content-Length'))) |
| |
| extra = { |
| 'content_type': props.findtext(fixxpath(xpath='Content-Type')), |
| 'etag': etag, |
| 'md5_hash': props.findtext(fixxpath(xpath='Content-MD5')), |
| 'last_modified': props.findtext(fixxpath(xpath='Last-Modified')), |
| 'url': blob.findtext(fixxpath(xpath='Url')), |
| 'hash': props.findtext(fixxpath(xpath='Etag')), |
| 'lease': { |
| 'status': props.findtext(fixxpath(xpath='LeaseStatus')), |
| 'state': props.findtext(fixxpath(xpath='LeaseState')), |
| 'duration': props.findtext(fixxpath(xpath='LeaseDuration')), |
| }, |
| 'content_encoding': props.findtext(fixxpath( |
| xpath='Content-Encoding')), |
| 'content_language': props.findtext(fixxpath( |
| xpath='Content-Language')), |
| 'blob_type': props.findtext(fixxpath(xpath='BlobType')) |
| } |
| |
| if extra['md5_hash']: |
| value = binascii.hexlify(base64.b64decode(b(extra['md5_hash']))) |
| value = value.decode('ascii') |
| extra['md5_hash'] = value |
| |
| meta_data = {} |
| for meta in metadata.getchildren(): |
| meta_data[meta.tag] = meta.text |
| |
| return Object(name=name, size=size, hash=etag, meta_data=meta_data, |
| extra=extra, container=container, driver=self) |
| |
| def _response_to_object(self, object_name, container, response): |
| """ |
| Converts a HTTP response to an object (from headers) |
| |
| @param object_name: Name of the object |
| @type object_name: C{str} |
| |
| @param container: Instance of the container holding the blob |
| @type: L{Container} |
| |
| @param response: HTTP Response |
| @type node: L{} |
| |
| @return: An object instance |
| @rtype: L{Object} |
| """ |
| |
| headers = response.headers |
| size = int(headers['content-length']) |
| etag = headers['etag'] |
| |
| extra = { |
| 'url': 'http://%s%s' % (response.connection.host, |
| response.connection.action), |
| 'etag': etag, |
| 'md5_hash': headers.get('content-md5', None), |
| 'content_type': headers.get('content-type', None), |
| 'content_language': headers.get('content-language', None), |
| 'content_encoding': headers.get('content-encoding', None), |
| 'last_modified': headers['last-modified'], |
| 'lease': { |
| 'status': headers.get('x-ms-lease-status', None), |
| 'state': headers.get('x-ms-lease-state', None), |
| 'duration': headers.get('x-ms-lease-duration', None), |
| }, |
| 'blob_type': headers['x-ms-blob-type'] |
| } |
| |
| if extra['md5_hash']: |
| value = binascii.hexlify(base64.b64decode(b(extra['md5_hash']))) |
| value = value.decode('ascii') |
| extra['md5_hash'] = value |
| |
| meta_data = {} |
| for key, value in response.headers.items(): |
| if key.startswith('x-ms-meta-'): |
| key = key.split('x-ms-meta-')[1] |
| meta_data[key] = value |
| |
| return Object(name=object_name, size=size, hash=etag, extra=extra, |
| meta_data=meta_data, container=container, driver=self) |
| |
| def iterate_containers(self): |
| """ |
| @inherits: L{StorageDriver.iterate_containers} |
| """ |
| params = {'comp': 'list', |
| 'maxresults': RESPONSES_PER_REQUEST, |
| 'include': 'metadata'} |
| |
| while True: |
| response = self.connection.request('/', params) |
| if response.status != httplib.OK: |
| raise LibcloudError('Unexpected status code: %s' % |
| (response.status), driver=self) |
| |
| body = response.parse_body() |
| containers = body.find(fixxpath(xpath='Containers')) |
| containers = containers.findall(fixxpath(xpath='Container')) |
| |
| for container in containers: |
| yield self._xml_to_container(container) |
| |
| params['marker'] = body.findtext('NextMarker') |
| if not params['marker']: |
| break |
| |
| def iterate_container_objects(self, container): |
| """ |
| @inherits: L{StorageDriver.iterate_container_objects} |
| """ |
| params = {'restype': 'container', |
| 'comp': 'list', |
| 'maxresults': RESPONSES_PER_REQUEST, |
| 'include': 'metadata'} |
| |
| container_path = self._get_container_path(container) |
| |
| while True: |
| response = self.connection.request(container_path, |
| params=params) |
| |
| if response.status == httplib.NOT_FOUND: |
| raise ContainerDoesNotExistError(value=None, |
| driver=self, |
| container_name=container.name) |
| |
| elif response.status != httplib.OK: |
| raise LibcloudError('Unexpected status code: %s' % |
| (response.status), driver=self) |
| |
| body = response.parse_body() |
| blobs = body.find(fixxpath(xpath='Blobs')) |
| blobs = blobs.findall(fixxpath(xpath='Blob')) |
| |
| for blob in blobs: |
| yield self._xml_to_object(container, blob) |
| |
| params['marker'] = body.findtext('NextMarker') |
| if not params['marker']: |
| break |
| |
| def get_container(self, container_name): |
| """ |
| @inherits: L{StorageDriver.get_container} |
| """ |
| params = {'restype': 'container'} |
| |
| container_path = '/%s' % (container_name) |
| |
| response = self.connection.request(container_path, params=params, |
| method='HEAD') |
| |
| if response.status == httplib.NOT_FOUND: |
| raise ContainerDoesNotExistError('Container %s does not exist' % |
| (container_name), driver=self, |
| container_name=container_name) |
| elif response.status != httplib.OK: |
| raise LibcloudError('Unexpected status code: %s' % |
| (response.status), driver=self) |
| |
| return self._response_to_container(container_name, response) |
| |
| def get_object(self, container_name, object_name): |
| """ |
| @inherits: L{StorageDriver.get_object} |
| """ |
| |
| container = self.get_container(container_name=container_name) |
| object_path = self._get_object_path(container, object_name) |
| |
| response = self.connection.request(object_path, method='HEAD') |
| |
| if response.status == httplib.OK: |
| obj = self._response_to_object(object_name, container, response) |
| return obj |
| |
| raise ObjectDoesNotExistError(value=None, driver=self, |
| object_name=object_name) |
| |
| def _get_container_path(self, container): |
| """ |
| Return a container path |
| |
| @param container: Container instance |
| @type container: L{Container} |
| |
| @return: A path for this container. |
| @rtype: C{str} |
| """ |
| return '/%s' % (container.name) |
| |
| def _get_object_path(self, container, object_name): |
| """ |
| Return an object's CDN path. |
| |
| @param container: Container instance |
| @type container: L{Container} |
| |
| @param object_name: Object name |
| @type object_name: L{str} |
| |
| @return: A path for this object. |
| @rtype: C{str} |
| """ |
| container_url = self._get_container_path(container) |
| object_name_cleaned = urlquote(object_name) |
| object_path = '%s/%s' % (container_url, object_name_cleaned) |
| return object_path |
| |
| def create_container(self, container_name): |
| """ |
| @inherits: L{StorageDriver.create_container} |
| """ |
| params = {'restype': 'container'} |
| |
| container_path = '/%s' % (container_name) |
| response = self.connection.request(container_path, params=params, |
| method='PUT') |
| |
| if response.status == httplib.CREATED: |
| return self._response_to_container(container_name, response) |
| elif response.status == httplib.CONFLICT: |
| raise ContainerAlreadyExistsError( |
| value='Container with this name already exists. The name must ' |
| 'be unique among all the containers in the system', |
| container_name=container_name, driver=self) |
| elif response.status == httplib.BAD_REQUEST: |
| raise InvalidContainerNameError(value='Container name contains ' + |
| 'invalid characters.', |
| container_name=container_name, |
| driver=self) |
| |
| raise LibcloudError('Unexpected status code: %s' % (response.status), |
| driver=self) |
| |
| def delete_container(self, container): |
| """ |
| @inherits: L{StorageDriver.delete_container} |
| """ |
| # Azure does not check if the container is empty. So, we will do |
| # a check to ensure that the behaviour is similar to other drivers |
| for obj in container.iterate_objects(): |
| raise ContainerIsNotEmptyError( |
| value='Container must be empty before it can be deleted.', |
| container_name=container.name, driver=self) |
| |
| params = {'restype': 'container'} |
| container_path = self._get_container_path(container) |
| |
| # Note: All the objects in the container must be deleted first |
| response = self.connection.request(container_path, params=params, |
| method='DELETE') |
| |
| if response.status == httplib.ACCEPTED: |
| return True |
| elif response.status == httplib.NOT_FOUND: |
| raise ContainerDoesNotExistError(value=None, |
| driver=self, |
| container_name=container.name) |
| |
| return False |
| |
| def download_object(self, obj, destination_path, overwrite_existing=False, |
| delete_on_failure=True): |
| """ |
| @inherits: L{StorageDriver.download_object} |
| """ |
| obj_path = self._get_object_path(obj.container, obj.name) |
| response = self.connection.request(obj_path, raw=True, data=None) |
| |
| return self._get_object(obj=obj, callback=self._save_object, |
| response=response, |
| callback_kwargs={ |
| 'obj': obj, |
| 'response': response.response, |
| 'destination_path': destination_path, |
| 'overwrite_existing': overwrite_existing, |
| 'delete_on_failure': delete_on_failure}, |
| success_status_code=httplib.OK) |
| |
| def download_object_as_stream(self, obj, chunk_size=None): |
| """ |
| @inherits: L{StorageDriver.download_object_as_stream} |
| """ |
| obj_path = self._get_object_path(obj.container, obj.name) |
| response = self.connection.request(obj_path, raw=True, data=None) |
| |
| return self._get_object(obj=obj, callback=read_in_chunks, |
| response=response, |
| callback_kwargs={'iterator': response.response, |
| 'chunk_size': chunk_size}, |
| success_status_code=httplib.OK) |
| |
| def _upload_in_chunks(self, response, data, iterator, object_path, |
| blob_type, lease, calculate_hash=True): |
| """ |
| Uploads data from an interator in fixed sized chunks to S3 |
| |
| @param response: Response object from the initial POST request |
| @type response: L{RawResponse} |
| |
| @param data: Any data from the initial POST request |
| @type data: C{str} |
| |
| @param iterator: The generator for fetching the upload data |
| @type iterator: C{generator} |
| |
| @param object_path: The path of the object to which we are uploading |
| @type object_name: C{str} |
| |
| @param blob_type: The blob type being uploaded |
| @type blob_type: C{str} |
| |
| @param lease: The lease object to be used for renewal |
| @type lease: L{AzureBlobLease} |
| |
| @keyword calculate_hash: Indicates if we must calculate the data hash |
| @type calculate_hash: C{bool} |
| |
| @return: A tuple of (status, checksum, bytes transferred) |
| @rtype: C{tuple} |
| """ |
| |
| # Get the upload id from the response xml |
| if response.status != httplib.CREATED: |
| raise LibcloudError('Error initializing upload. Code: %d' % |
| (response.status), driver=self) |
| |
| data_hash = None |
| if calculate_hash: |
| data_hash = self._get_hash_function() |
| |
| bytes_transferred = 0 |
| count = 1 |
| chunks = [] |
| headers = {} |
| |
| lease.update_headers(headers) |
| |
| if blob_type == 'BlockBlob': |
| params = {'comp': 'block'} |
| else: |
| params = {'comp': 'page'} |
| |
| # Read the input data in chunk sizes suitable for AWS |
| for data in read_in_chunks(iterator, AZURE_CHUNK_SIZE): |
| data = b(data) |
| content_length = len(data) |
| offset = bytes_transferred |
| bytes_transferred += content_length |
| |
| if calculate_hash: |
| data_hash.update(data) |
| |
| chunk_hash = self._get_hash_function() |
| chunk_hash.update(data) |
| chunk_hash = base64.b64encode(b(chunk_hash.digest())) |
| |
| headers['Content-MD5'] = chunk_hash.decode('utf-8') |
| headers['Content-Length'] = content_length |
| |
| if blob_type == 'BlockBlob': |
| # Block id can be any unique string that is base64 encoded |
| # A 10 digit number can hold the max value of 50000 blocks |
| # that are allowed for azure |
| block_id = base64.b64encode(b('%10d' % (count))) |
| block_id = block_id.decode('utf-8') |
| params['blockid'] = block_id |
| |
| # Keep this data for a later commit |
| chunks.append(block_id) |
| else: |
| headers['x-ms-page-write'] = 'update' |
| headers['x-ms-range'] = 'bytes=%d-%d' % \ |
| (offset, bytes_transferred-1) |
| |
| # Renew lease before updating |
| lease.renew() |
| |
| resp = self.connection.request(object_path, method='PUT', |
| data=data, headers=headers, |
| params=params) |
| |
| if resp.status != httplib.CREATED: |
| resp.parse_error() |
| raise LibcloudError('Error uploading chunk %d. Code: %d' % |
| (count, resp.status), driver=self) |
| |
| count += 1 |
| |
| if calculate_hash: |
| data_hash = data_hash.hexdigest() |
| |
| if blob_type == 'BlockBlob': |
| self._commit_blocks(object_path, chunks, lease) |
| |
| # The Azure service does not return a hash immediately for |
| # chunked uploads. It takes some time for the data to get synced |
| response.headers['content-md5'] = None |
| |
| return (True, data_hash, bytes_transferred) |
| |
| def _commit_blocks(self, object_path, chunks, lease): |
| """ |
| Makes a final commit of the data. |
| |
| @param object_path: Server side object path. |
| @type object_path: C{str} |
| |
| @param upload_id: A list of (chunk_number, chunk_hash) tuples. |
| @type upload_id: C{list} |
| """ |
| |
| root = Element('BlockList') |
| |
| for block_id in chunks: |
| part = SubElement(root, 'Uncommitted') |
| part.text = str(block_id) |
| |
| data = tostring(root) |
| params = {'comp': 'blocklist'} |
| headers = {} |
| |
| lease.update_headers(headers) |
| lease.renew() |
| |
| response = self.connection.request(object_path, data=data, |
| params=params, headers=headers, |
| method='PUT') |
| |
| if response.status != httplib.CREATED: |
| raise LibcloudError('Error in blocklist commit', driver=self) |
| |
| def _check_values(self, blob_type, object_size): |
| """ |
| Checks if extension arguments are valid |
| |
| @param blob_type: The blob type that is being uploaded |
| @type blob_type: C{str} |
| |
| @param object_size: The (max) size of the object being uploaded |
| @type object_size: C{int} |
| """ |
| |
| if blob_type not in ['BlockBlob', 'PageBlob']: |
| raise LibcloudError('Invalid blob type', driver=self) |
| |
| if blob_type == 'PageBlob': |
| if not object_size: |
| raise LibcloudError('Max blob size is mandatory for page blob', |
| driver=self) |
| |
| if object_size % AZURE_PAGE_CHUNK_SIZE: |
| raise LibcloudError('Max blob size is not aligned to ' |
| 'page boundary', driver=self) |
| |
| def upload_object(self, file_path, container, object_name, extra=None, |
| verify_hash=True, ex_blob_type=None, ex_use_lease=False): |
| """ |
| Upload an object currently located on a disk. |
| |
| @inherits: L{StorageDriver.upload_object} |
| |
| @param ex_blob_type: Storage class |
| @type ex_blob_type: C{str} |
| |
| @param ex_use_lease: Indicates if we must take a lease before upload |
| @type ex_use_lease: C{bool} |
| """ |
| |
| if ex_blob_type is None: |
| ex_blob_type = self.ex_blob_type |
| |
| # Get the size of the file |
| file_size = os.stat(file_path).st_size |
| |
| # The presumed size of the object |
| object_size = file_size |
| |
| self._check_values(ex_blob_type, file_size) |
| |
| with file(file_path, 'rb') as file_handle: |
| iterator = iter(file_handle) |
| |
| # If size is greater than 64MB or type is Page, upload in chunks |
| if ex_blob_type == 'PageBlob' or file_size > AZURE_BLOCK_MAX_SIZE: |
| # For chunked upload of block blobs, the initial size must |
| # be 0. |
| if ex_blob_type == 'BlockBlob': |
| object_size = None |
| |
| object_path = self._get_object_path(container, object_name) |
| |
| upload_func = self._upload_in_chunks |
| upload_func_kwargs = {'iterator': iterator, |
| 'object_path': object_path, |
| 'blob_type': ex_blob_type, |
| 'lease': None} |
| else: |
| upload_func = self._stream_data |
| upload_func_kwargs = {'iterator': iterator, |
| 'chunked': False, |
| 'calculate_hash': verify_hash} |
| |
| return self._put_object(container=container, |
| object_name=object_name, |
| object_size=object_size, |
| upload_func=upload_func, |
| upload_func_kwargs=upload_func_kwargs, |
| file_path=file_path, extra=extra, |
| verify_hash=verify_hash, |
| blob_type=ex_blob_type, |
| use_lease=ex_use_lease) |
| |
| def upload_object_via_stream(self, iterator, container, object_name, |
| verify_hash=False, extra=None, |
| ex_use_lease=False, ex_blob_type=None, |
| ex_page_blob_size=None): |
| """ |
| @inherits: L{StorageDriver.upload_object_via_stream} |
| |
| @param ex_blob_type: Storage class |
| @type ex_blob_type: C{str} |
| |
| @param ex_page_blob_size: The maximum size to which the |
| page blob can grow to |
| @type ex_page_blob_size: C{int} |
| |
| @param ex_use_lease: Indicates if we must take a lease before upload |
| @type ex_use_lease: C{bool} |
| """ |
| |
| if ex_blob_type is None: |
| ex_blob_type = self.ex_blob_type |
| |
| self._check_values(ex_blob_type, ex_page_blob_size) |
| |
| object_path = self._get_object_path(container, object_name) |
| |
| upload_func = self._upload_in_chunks |
| upload_func_kwargs = {'iterator': iterator, |
| 'object_path': object_path, |
| 'blob_type': ex_blob_type, |
| 'lease': None} |
| |
| return self._put_object(container=container, |
| object_name=object_name, |
| object_size=ex_page_blob_size, |
| upload_func=upload_func, |
| upload_func_kwargs=upload_func_kwargs, |
| extra=extra, verify_hash=verify_hash, |
| blob_type=ex_blob_type, |
| use_lease=ex_use_lease) |
| |
| def delete_object(self, obj): |
| """ |
| @inherits: L{StorageDriver.delete_object} |
| """ |
| object_path = self._get_object_path(obj.container, obj.name) |
| response = self.connection.request(object_path, method='DELETE') |
| |
| if response.status == httplib.ACCEPTED: |
| return True |
| elif response.status == httplib.NOT_FOUND: |
| raise ObjectDoesNotExistError(value=None, driver=self, |
| object_name=obj.name) |
| |
| return False |
| |
| def _update_metadata(self, headers, meta_data): |
| """ |
| Update the given metadata in the headers |
| |
| @param headers: The headers dictionary to be updated |
| @type headers: C{dict} |
| |
| @param meta_data: Metadata key value pairs |
| @type meta_data: C{dict} |
| """ |
| for key, value in list(meta_data.items()): |
| key = 'x-ms-meta-%s' % (key) |
| headers[key] = value |
| |
| def _prepare_upload_headers(self, object_name, object_size, |
| extra, meta_data, blob_type): |
| """ |
| Prepare headers for uploading an object |
| |
| @param object_name: The full name of the object being updated |
| @type object_name: C{str} |
| |
| @param object_size: The size of the object. In case of PageBlobs, |
| this indicates the maximum size the blob can grow to |
| @type object_size: C{int} |
| |
| @param extra: Extra control data for the upload |
| @type extra: C{dict} |
| |
| @param meta_data: Metadata key value pairs |
| @type meta_data: C{dict} |
| |
| @param blob_type: Page or Block blob type |
| @type blob_type: C{str} |
| """ |
| headers = {} |
| |
| if blob_type is None: |
| blob_type = self.ex_blob_type |
| |
| headers['x-ms-blob-type'] = blob_type |
| |
| self._update_metadata(headers, meta_data) |
| |
| if object_size is not None: |
| headers['Content-Length'] = object_size |
| |
| if blob_type == 'PageBlob': |
| headers['Content-Length'] = 0 |
| headers['x-ms-blob-content-length'] = object_size |
| |
| return headers |
| |
| def _put_object(self, container, object_name, object_size, upload_func, |
| upload_func_kwargs, file_path=None, extra=None, |
| verify_hash=True, blob_type=None, use_lease=False): |
| """ |
| Control function that does the real job of uploading data to a blob |
| """ |
| extra = extra or {} |
| meta_data = extra.get('meta_data', {}) |
| content_type = extra.get('content_type', None) |
| |
| headers = self._prepare_upload_headers(object_name, object_size, |
| extra, meta_data, blob_type) |
| |
| object_path = self._get_object_path(container, object_name) |
| |
| # Get a lease if required and do the operations |
| with AzureBlobLease(self, object_path, use_lease) as lease: |
| if 'lease' in upload_func_kwargs: |
| upload_func_kwargs['lease'] = lease |
| |
| lease.update_headers(headers) |
| |
| iterator = iter('') |
| result_dict = self._upload_object(object_name, content_type, |
| upload_func, upload_func_kwargs, |
| object_path, headers=headers, |
| file_path=file_path, |
| iterator=iterator) |
| |
| response = result_dict['response'] |
| bytes_transferred = result_dict['bytes_transferred'] |
| data_hash = result_dict['data_hash'] |
| headers = response.headers |
| response = response.response |
| |
| if response.status != httplib.CREATED: |
| raise LibcloudError( |
| 'Unexpected status code, status_code=%s' % (response.status), |
| driver=self) |
| |
| server_hash = headers['content-md5'] |
| |
| if server_hash: |
| server_hash = binascii.hexlify(base64.b64decode(b(server_hash))) |
| server_hash = server_hash.decode('utf-8') |
| else: |
| # TODO: HACK - We could poll the object for a while and get |
| # the hash |
| pass |
| |
| if (verify_hash and server_hash and data_hash != server_hash): |
| raise ObjectHashMismatchError( |
| value='MD5 hash checksum does not match', |
| object_name=object_name, driver=self) |
| |
| return Object(name=object_name, size=bytes_transferred, |
| hash=headers['etag'], extra=None, |
| meta_data=meta_data, container=container, |
| driver=self) |
| |
| def ex_set_object_metadata(self, obj, meta_data): |
| """ |
| Set metadata for an object |
| |
| @param obj: The blob object |
| @type obj: L{Object} |
| |
| @param meta_data: Metadata key value pairs |
| @type meta_data: C{dict} |
| """ |
| object_path = self._get_object_path(obj.container, obj.name) |
| params = {'comp': 'metadata'} |
| headers = {} |
| |
| self._update_metadata(headers, meta_data) |
| |
| response = self.connection.request(object_path, method='PUT', |
| params=params, |
| headers=headers) |
| |
| if response.status != httplib.OK: |
| response.parse_error('Setting metadata') |