blob: 06e4511c765e20224a6bf8284cd691363a4c5f3c [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import with_statement
import time
import base64
import hmac
import re
import os
import binascii
from hashlib import sha256
from xml.etree.ElementTree import Element, SubElement
from libcloud.utils.py3 import PY3
from libcloud.utils.py3 import httplib
from libcloud.utils.py3 import urlquote
from libcloud.utils.py3 import tostring
from libcloud.utils.py3 import b
from libcloud.utils.xml import fixxpath, findtext
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import LibcloudError
from libcloud.common.azure import AzureConnection
from libcloud.storage.base import Object, Container, StorageDriver
from libcloud.storage.types import ContainerIsNotEmptyError
from libcloud.storage.types import ContainerAlreadyExistsError
from libcloud.storage.types import InvalidContainerNameError
from libcloud.storage.types import ContainerDoesNotExistError
from libcloud.storage.types import ObjectDoesNotExistError
from libcloud.storage.types import ObjectHashMismatchError
if PY3:
from io import FileIO as file
# Desired number of items in each response inside a paginated request
RESPONSES_PER_REQUEST = 100
# As per the Azure documentation, if the upload file size is less than
# 64MB, we can upload it in a single request. However, in real life azure
# servers seem to disconnect randomly after around 5 MB or 200s of upload.
# So, it is better that for file sizes greater than 4MB, we upload it in
# chunks.
# Also, with large sizes, if we use a lease, the lease will timeout after
# 60 seconds, but the upload might still be in progress. This can be
# handled in code, but if we use chunked uploads, the lease renewal will
# happen automatically.
AZURE_BLOCK_MAX_SIZE = 4 * 1024 * 1024
# Azure block blocks must be maximum 4MB
# Azure page blobs must be aligned in 512 byte boundaries (4MB fits that)
AZURE_CHUNK_SIZE = 4 * 1024 * 1024
# Azure page blob must be aligned in 512 byte boundaries
AZURE_PAGE_CHUNK_SIZE = 512
# The time period (in seconds) for which a lease must be obtained.
# If set as -1, we get an infinite lease, but that is a bad idea. If
# after getting an infinite lease, there was an issue in releasing the
# lease, the object will remain 'locked' forever, unless the lease is
# released using the lease_id (which is not exposed to the user)
AZURE_LEASE_PERIOD = 60
class AzureBlobLease(object):
"""
A class to help in leasing an azure blob and renewing the lease
"""
def __init__(self, driver, object_path, use_lease):
"""
@param driver: The Azure storage driver that is being used
@type driver: L{AzureStorageDriver}
@param object_path: The path of the object we need to lease
@type object_path: C{str}
@param use_lease: Indicates if we must take a lease or not
@type use_lease: C{bool}
"""
self.object_path = object_path
self.driver = driver
self.use_lease = use_lease
self.lease_id = None
self.params = {'comp': 'lease'}
def renew(self):
"""
Renew the lease if it is older than a predefined time period
"""
if self.lease_id is None:
return
headers = {'x-ms-lease-action': 'renew',
'x-ms-lease-id': self.lease_id,
'x-ms-lease-duration': '60'}
response = self.driver.connection.request(self.object_path,
headers=headers,
params=self.params,
method='PUT')
if response.status != httplib.OK:
raise LibcloudError('Unable to obtain lease', driver=self)
def update_headers(self, headers):
"""
Update the lease id in the headers
"""
if self.lease_id:
headers['x-ms-lease-id'] = self.lease_id
def __enter__(self):
if not self.use_lease:
return self
headers = {'x-ms-lease-action': 'acquire',
'x-ms-lease-duration': '60'}
response = self.driver.connection.request(self.object_path,
headers=headers,
params=self.params,
method='PUT')
if response.status == httplib.NOT_FOUND:
return self
elif response.status != httplib.CREATED:
raise LibcloudError('Unable to obtain lease', driver=self)
self.lease_id = response.headers['x-ms-lease-id']
return self
def __exit__(self, type, value, traceback):
if self.lease_id is None:
return
headers = {'x-ms-lease-action': 'release',
'x-ms-lease-id': self.lease_id}
response = self.driver.connection.request(self.object_path,
headers=headers,
params=self.params,
method='PUT')
if response.status != httplib.OK:
raise LibcloudError('Unable to release lease', driver=self)
class AzureBlobsConnection(AzureConnection):
"""
Represents a single connection to Azure Blobs
"""
host = 'blob.core.windows.net'
class AzureBlobsStorageDriver(StorageDriver):
name = 'Microsoft Azure (blobs)'
website = 'http://windows.azure.com/'
connectionCls = AzureBlobsConnection
hash_type = 'md5'
supports_chunked_encoding = False
ex_blob_type = 'BlockBlob'
def __init__(self, key, secret=None, secure=True, host=None, port=None,
**kwargs):
# The hostname must be 'account.blobs.core.windows.net'
self.connectionCls.host = '%s.%s' % (key, self.connectionCls.host)
# B64decode() this key and keep it, so that we don't have to do
# so for every request. Minor performance improvement
secret = base64.b64decode(b(secret))
super(AzureBlobsStorageDriver, self).__init__(
key=key, secret=secret,
secure=secure, host=host,
port=port, **kwargs)
def _xml_to_container(self, node):
"""
Converts a container XML node to a container instance
@param node: XML info of the container
@type node: L{xml.etree.ElementTree.Element}
@return: A container instance
@rtype: L{Container}
"""
name = node.findtext(fixxpath(xpath='Name'))
props = node.find(fixxpath(xpath='Properties'))
metadata = node.find(fixxpath(xpath='Metadata'))
extra = {
'url': node.findtext(fixxpath(xpath='Url')),
'last_modified': node.findtext(fixxpath(xpath='Last-Modified')),
'etag': props.findtext(fixxpath(xpath='Etag')),
'lease': {
'status': props.findtext(fixxpath(xpath='LeaseStatus')),
'state': props.findtext(fixxpath(xpath='LeaseState')),
'duration': props.findtext(fixxpath(xpath='LeaseDuration')),
},
'meta_data': {}
}
for meta in metadata.getchildren():
extra['meta_data'][meta.tag] = meta.text
return Container(name=name, extra=extra, driver=self)
def _response_to_container(self, container_name, response):
"""
Converts a HTTP response to a container instance
@param container_name: Name of the container
@type container_name: C{str}
@param response: HTTP Response
@type node: L{}
@return: A container instance
@rtype: L{Container}
"""
headers = response.headers
extra = {
'url': 'http://%s%s' % (response.connection.host,
response.connection.action),
'etag': headers['etag'],
'last_modified': headers['last-modified'],
'lease': {
'status': headers.get('x-ms-lease-status', None),
'state': headers.get('x-ms-lease-state', None),
'duration': headers.get('x-ms-lease-duration', None),
},
'meta_data': {}
}
for key, value in response.headers.items():
if key.startswith('x-ms-meta-'):
key = key.split('x-ms-meta-')[1]
extra['meta_data'][key] = value
return Container(name=container_name, extra=extra, driver=self)
def _xml_to_object(self, container, blob):
"""
Converts a BLOB XML node to an object instance
@param container: Instance of the container holding the blob
@type: L{Container}
@param blob: XML info of the blob
@type blob: L{}
@return: An object instance
@rtype: L{Object}
"""
name = blob.findtext(fixxpath(xpath='Name'))
props = blob.find(fixxpath(xpath='Properties'))
metadata = blob.find(fixxpath(xpath='Metadata'))
etag = props.findtext(fixxpath(xpath='Etag'))
size = int(props.findtext(fixxpath(xpath='Content-Length')))
extra = {
'content_type': props.findtext(fixxpath(xpath='Content-Type')),
'etag': etag,
'md5_hash': props.findtext(fixxpath(xpath='Content-MD5')),
'last_modified': props.findtext(fixxpath(xpath='Last-Modified')),
'url': blob.findtext(fixxpath(xpath='Url')),
'hash': props.findtext(fixxpath(xpath='Etag')),
'lease': {
'status': props.findtext(fixxpath(xpath='LeaseStatus')),
'state': props.findtext(fixxpath(xpath='LeaseState')),
'duration': props.findtext(fixxpath(xpath='LeaseDuration')),
},
'content_encoding': props.findtext(fixxpath(
xpath='Content-Encoding')),
'content_language': props.findtext(fixxpath(
xpath='Content-Language')),
'blob_type': props.findtext(fixxpath(xpath='BlobType'))
}
if extra['md5_hash']:
value = binascii.hexlify(base64.b64decode(b(extra['md5_hash'])))
value = value.decode('ascii')
extra['md5_hash'] = value
meta_data = {}
for meta in metadata.getchildren():
meta_data[meta.tag] = meta.text
return Object(name=name, size=size, hash=etag, meta_data=meta_data,
extra=extra, container=container, driver=self)
def _response_to_object(self, object_name, container, response):
"""
Converts a HTTP response to an object (from headers)
@param object_name: Name of the object
@type object_name: C{str}
@param container: Instance of the container holding the blob
@type: L{Container}
@param response: HTTP Response
@type node: L{}
@return: An object instance
@rtype: L{Object}
"""
headers = response.headers
size = int(headers['content-length'])
etag = headers['etag']
extra = {
'url': 'http://%s%s' % (response.connection.host,
response.connection.action),
'etag': etag,
'md5_hash': headers.get('content-md5', None),
'content_type': headers.get('content-type', None),
'content_language': headers.get('content-language', None),
'content_encoding': headers.get('content-encoding', None),
'last_modified': headers['last-modified'],
'lease': {
'status': headers.get('x-ms-lease-status', None),
'state': headers.get('x-ms-lease-state', None),
'duration': headers.get('x-ms-lease-duration', None),
},
'blob_type': headers['x-ms-blob-type']
}
if extra['md5_hash']:
value = binascii.hexlify(base64.b64decode(b(extra['md5_hash'])))
value = value.decode('ascii')
extra['md5_hash'] = value
meta_data = {}
for key, value in response.headers.items():
if key.startswith('x-ms-meta-'):
key = key.split('x-ms-meta-')[1]
meta_data[key] = value
return Object(name=object_name, size=size, hash=etag, extra=extra,
meta_data=meta_data, container=container, driver=self)
def iterate_containers(self):
"""
@inherits: L{StorageDriver.iterate_containers}
"""
params = {'comp': 'list',
'maxresults': RESPONSES_PER_REQUEST,
'include': 'metadata'}
while True:
response = self.connection.request('/', params)
if response.status != httplib.OK:
raise LibcloudError('Unexpected status code: %s' %
(response.status), driver=self)
body = response.parse_body()
containers = body.find(fixxpath(xpath='Containers'))
containers = containers.findall(fixxpath(xpath='Container'))
for container in containers:
yield self._xml_to_container(container)
params['marker'] = body.findtext('NextMarker')
if not params['marker']:
break
def iterate_container_objects(self, container):
"""
@inherits: L{StorageDriver.iterate_container_objects}
"""
params = {'restype': 'container',
'comp': 'list',
'maxresults': RESPONSES_PER_REQUEST,
'include': 'metadata'}
container_path = self._get_container_path(container)
while True:
response = self.connection.request(container_path,
params=params)
if response.status == httplib.NOT_FOUND:
raise ContainerDoesNotExistError(value=None,
driver=self,
container_name=container.name)
elif response.status != httplib.OK:
raise LibcloudError('Unexpected status code: %s' %
(response.status), driver=self)
body = response.parse_body()
blobs = body.find(fixxpath(xpath='Blobs'))
blobs = blobs.findall(fixxpath(xpath='Blob'))
for blob in blobs:
yield self._xml_to_object(container, blob)
params['marker'] = body.findtext('NextMarker')
if not params['marker']:
break
def get_container(self, container_name):
"""
@inherits: L{StorageDriver.get_container}
"""
params = {'restype': 'container'}
container_path = '/%s' % (container_name)
response = self.connection.request(container_path, params=params,
method='HEAD')
if response.status == httplib.NOT_FOUND:
raise ContainerDoesNotExistError('Container %s does not exist' %
(container_name), driver=self,
container_name=container_name)
elif response.status != httplib.OK:
raise LibcloudError('Unexpected status code: %s' %
(response.status), driver=self)
return self._response_to_container(container_name, response)
def get_object(self, container_name, object_name):
"""
@inherits: L{StorageDriver.get_object}
"""
container = self.get_container(container_name=container_name)
object_path = self._get_object_path(container, object_name)
response = self.connection.request(object_path, method='HEAD')
if response.status == httplib.OK:
obj = self._response_to_object(object_name, container, response)
return obj
raise ObjectDoesNotExistError(value=None, driver=self,
object_name=object_name)
def _get_container_path(self, container):
"""
Return a container path
@param container: Container instance
@type container: L{Container}
@return: A path for this container.
@rtype: C{str}
"""
return '/%s' % (container.name)
def _get_object_path(self, container, object_name):
"""
Return an object's CDN path.
@param container: Container instance
@type container: L{Container}
@param object_name: Object name
@type object_name: L{str}
@return: A path for this object.
@rtype: C{str}
"""
container_url = self._get_container_path(container)
object_name_cleaned = urlquote(object_name)
object_path = '%s/%s' % (container_url, object_name_cleaned)
return object_path
def create_container(self, container_name):
"""
@inherits: L{StorageDriver.create_container}
"""
params = {'restype': 'container'}
container_path = '/%s' % (container_name)
response = self.connection.request(container_path, params=params,
method='PUT')
if response.status == httplib.CREATED:
return self._response_to_container(container_name, response)
elif response.status == httplib.CONFLICT:
raise ContainerAlreadyExistsError(
value='Container with this name already exists. The name must '
'be unique among all the containers in the system',
container_name=container_name, driver=self)
elif response.status == httplib.BAD_REQUEST:
raise InvalidContainerNameError(value='Container name contains ' +
'invalid characters.',
container_name=container_name,
driver=self)
raise LibcloudError('Unexpected status code: %s' % (response.status),
driver=self)
def delete_container(self, container):
"""
@inherits: L{StorageDriver.delete_container}
"""
# Azure does not check if the container is empty. So, we will do
# a check to ensure that the behaviour is similar to other drivers
for obj in container.iterate_objects():
raise ContainerIsNotEmptyError(
value='Container must be empty before it can be deleted.',
container_name=container.name, driver=self)
params = {'restype': 'container'}
container_path = self._get_container_path(container)
# Note: All the objects in the container must be deleted first
response = self.connection.request(container_path, params=params,
method='DELETE')
if response.status == httplib.ACCEPTED:
return True
elif response.status == httplib.NOT_FOUND:
raise ContainerDoesNotExistError(value=None,
driver=self,
container_name=container.name)
return False
def download_object(self, obj, destination_path, overwrite_existing=False,
delete_on_failure=True):
"""
@inherits: L{StorageDriver.download_object}
"""
obj_path = self._get_object_path(obj.container, obj.name)
response = self.connection.request(obj_path, raw=True, data=None)
return self._get_object(obj=obj, callback=self._save_object,
response=response,
callback_kwargs={
'obj': obj,
'response': response.response,
'destination_path': destination_path,
'overwrite_existing': overwrite_existing,
'delete_on_failure': delete_on_failure},
success_status_code=httplib.OK)
def download_object_as_stream(self, obj, chunk_size=None):
"""
@inherits: L{StorageDriver.download_object_as_stream}
"""
obj_path = self._get_object_path(obj.container, obj.name)
response = self.connection.request(obj_path, raw=True, data=None)
return self._get_object(obj=obj, callback=read_in_chunks,
response=response,
callback_kwargs={'iterator': response.response,
'chunk_size': chunk_size},
success_status_code=httplib.OK)
def _upload_in_chunks(self, response, data, iterator, object_path,
blob_type, lease, calculate_hash=True):
"""
Uploads data from an interator in fixed sized chunks to S3
@param response: Response object from the initial POST request
@type response: L{RawResponse}
@param data: Any data from the initial POST request
@type data: C{str}
@param iterator: The generator for fetching the upload data
@type iterator: C{generator}
@param object_path: The path of the object to which we are uploading
@type object_name: C{str}
@param blob_type: The blob type being uploaded
@type blob_type: C{str}
@param lease: The lease object to be used for renewal
@type lease: L{AzureBlobLease}
@keyword calculate_hash: Indicates if we must calculate the data hash
@type calculate_hash: C{bool}
@return: A tuple of (status, checksum, bytes transferred)
@rtype: C{tuple}
"""
# Get the upload id from the response xml
if response.status != httplib.CREATED:
raise LibcloudError('Error initializing upload. Code: %d' %
(response.status), driver=self)
data_hash = None
if calculate_hash:
data_hash = self._get_hash_function()
bytes_transferred = 0
count = 1
chunks = []
headers = {}
lease.update_headers(headers)
if blob_type == 'BlockBlob':
params = {'comp': 'block'}
else:
params = {'comp': 'page'}
# Read the input data in chunk sizes suitable for AWS
for data in read_in_chunks(iterator, AZURE_CHUNK_SIZE):
data = b(data)
content_length = len(data)
offset = bytes_transferred
bytes_transferred += content_length
if calculate_hash:
data_hash.update(data)
chunk_hash = self._get_hash_function()
chunk_hash.update(data)
chunk_hash = base64.b64encode(b(chunk_hash.digest()))
headers['Content-MD5'] = chunk_hash.decode('utf-8')
headers['Content-Length'] = content_length
if blob_type == 'BlockBlob':
# Block id can be any unique string that is base64 encoded
# A 10 digit number can hold the max value of 50000 blocks
# that are allowed for azure
block_id = base64.b64encode(b('%10d' % (count)))
block_id = block_id.decode('utf-8')
params['blockid'] = block_id
# Keep this data for a later commit
chunks.append(block_id)
else:
headers['x-ms-page-write'] = 'update'
headers['x-ms-range'] = 'bytes=%d-%d' % \
(offset, bytes_transferred-1)
# Renew lease before updating
lease.renew()
resp = self.connection.request(object_path, method='PUT',
data=data, headers=headers,
params=params)
if resp.status != httplib.CREATED:
resp.parse_error()
raise LibcloudError('Error uploading chunk %d. Code: %d' %
(count, resp.status), driver=self)
count += 1
if calculate_hash:
data_hash = data_hash.hexdigest()
if blob_type == 'BlockBlob':
self._commit_blocks(object_path, chunks, lease)
# The Azure service does not return a hash immediately for
# chunked uploads. It takes some time for the data to get synced
response.headers['content-md5'] = None
return (True, data_hash, bytes_transferred)
def _commit_blocks(self, object_path, chunks, lease):
"""
Makes a final commit of the data.
@param object_path: Server side object path.
@type object_path: C{str}
@param upload_id: A list of (chunk_number, chunk_hash) tuples.
@type upload_id: C{list}
"""
root = Element('BlockList')
for block_id in chunks:
part = SubElement(root, 'Uncommitted')
part.text = str(block_id)
data = tostring(root)
params = {'comp': 'blocklist'}
headers = {}
lease.update_headers(headers)
lease.renew()
response = self.connection.request(object_path, data=data,
params=params, headers=headers,
method='PUT')
if response.status != httplib.CREATED:
raise LibcloudError('Error in blocklist commit', driver=self)
def _check_values(self, blob_type, object_size):
"""
Checks if extension arguments are valid
@param blob_type: The blob type that is being uploaded
@type blob_type: C{str}
@param object_size: The (max) size of the object being uploaded
@type object_size: C{int}
"""
if blob_type not in ['BlockBlob', 'PageBlob']:
raise LibcloudError('Invalid blob type', driver=self)
if blob_type == 'PageBlob':
if not object_size:
raise LibcloudError('Max blob size is mandatory for page blob',
driver=self)
if object_size % AZURE_PAGE_CHUNK_SIZE:
raise LibcloudError('Max blob size is not aligned to '
'page boundary', driver=self)
def upload_object(self, file_path, container, object_name, extra=None,
verify_hash=True, ex_blob_type=None, ex_use_lease=False):
"""
Upload an object currently located on a disk.
@inherits: L{StorageDriver.upload_object}
@param ex_blob_type: Storage class
@type ex_blob_type: C{str}
@param ex_use_lease: Indicates if we must take a lease before upload
@type ex_use_lease: C{bool}
"""
if ex_blob_type is None:
ex_blob_type = self.ex_blob_type
# Get the size of the file
file_size = os.stat(file_path).st_size
# The presumed size of the object
object_size = file_size
self._check_values(ex_blob_type, file_size)
with file(file_path, 'rb') as file_handle:
iterator = iter(file_handle)
# If size is greater than 64MB or type is Page, upload in chunks
if ex_blob_type == 'PageBlob' or file_size > AZURE_BLOCK_MAX_SIZE:
# For chunked upload of block blobs, the initial size must
# be 0.
if ex_blob_type == 'BlockBlob':
object_size = None
object_path = self._get_object_path(container, object_name)
upload_func = self._upload_in_chunks
upload_func_kwargs = {'iterator': iterator,
'object_path': object_path,
'blob_type': ex_blob_type,
'lease': None}
else:
upload_func = self._stream_data
upload_func_kwargs = {'iterator': iterator,
'chunked': False,
'calculate_hash': verify_hash}
return self._put_object(container=container,
object_name=object_name,
object_size=object_size,
upload_func=upload_func,
upload_func_kwargs=upload_func_kwargs,
file_path=file_path, extra=extra,
verify_hash=verify_hash,
blob_type=ex_blob_type,
use_lease=ex_use_lease)
def upload_object_via_stream(self, iterator, container, object_name,
verify_hash=False, extra=None,
ex_use_lease=False, ex_blob_type=None,
ex_page_blob_size=None):
"""
@inherits: L{StorageDriver.upload_object_via_stream}
@param ex_blob_type: Storage class
@type ex_blob_type: C{str}
@param ex_page_blob_size: The maximum size to which the
page blob can grow to
@type ex_page_blob_size: C{int}
@param ex_use_lease: Indicates if we must take a lease before upload
@type ex_use_lease: C{bool}
"""
if ex_blob_type is None:
ex_blob_type = self.ex_blob_type
self._check_values(ex_blob_type, ex_page_blob_size)
object_path = self._get_object_path(container, object_name)
upload_func = self._upload_in_chunks
upload_func_kwargs = {'iterator': iterator,
'object_path': object_path,
'blob_type': ex_blob_type,
'lease': None}
return self._put_object(container=container,
object_name=object_name,
object_size=ex_page_blob_size,
upload_func=upload_func,
upload_func_kwargs=upload_func_kwargs,
extra=extra, verify_hash=verify_hash,
blob_type=ex_blob_type,
use_lease=ex_use_lease)
def delete_object(self, obj):
"""
@inherits: L{StorageDriver.delete_object}
"""
object_path = self._get_object_path(obj.container, obj.name)
response = self.connection.request(object_path, method='DELETE')
if response.status == httplib.ACCEPTED:
return True
elif response.status == httplib.NOT_FOUND:
raise ObjectDoesNotExistError(value=None, driver=self,
object_name=obj.name)
return False
def _update_metadata(self, headers, meta_data):
"""
Update the given metadata in the headers
@param headers: The headers dictionary to be updated
@type headers: C{dict}
@param meta_data: Metadata key value pairs
@type meta_data: C{dict}
"""
for key, value in list(meta_data.items()):
key = 'x-ms-meta-%s' % (key)
headers[key] = value
def _prepare_upload_headers(self, object_name, object_size,
extra, meta_data, blob_type):
"""
Prepare headers for uploading an object
@param object_name: The full name of the object being updated
@type object_name: C{str}
@param object_size: The size of the object. In case of PageBlobs,
this indicates the maximum size the blob can grow to
@type object_size: C{int}
@param extra: Extra control data for the upload
@type extra: C{dict}
@param meta_data: Metadata key value pairs
@type meta_data: C{dict}
@param blob_type: Page or Block blob type
@type blob_type: C{str}
"""
headers = {}
if blob_type is None:
blob_type = self.ex_blob_type
headers['x-ms-blob-type'] = blob_type
self._update_metadata(headers, meta_data)
if object_size is not None:
headers['Content-Length'] = object_size
if blob_type == 'PageBlob':
headers['Content-Length'] = 0
headers['x-ms-blob-content-length'] = object_size
return headers
def _put_object(self, container, object_name, object_size, upload_func,
upload_func_kwargs, file_path=None, extra=None,
verify_hash=True, blob_type=None, use_lease=False):
"""
Control function that does the real job of uploading data to a blob
"""
extra = extra or {}
meta_data = extra.get('meta_data', {})
content_type = extra.get('content_type', None)
headers = self._prepare_upload_headers(object_name, object_size,
extra, meta_data, blob_type)
object_path = self._get_object_path(container, object_name)
# Get a lease if required and do the operations
with AzureBlobLease(self, object_path, use_lease) as lease:
if 'lease' in upload_func_kwargs:
upload_func_kwargs['lease'] = lease
lease.update_headers(headers)
iterator = iter('')
result_dict = self._upload_object(object_name, content_type,
upload_func, upload_func_kwargs,
object_path, headers=headers,
file_path=file_path,
iterator=iterator)
response = result_dict['response']
bytes_transferred = result_dict['bytes_transferred']
data_hash = result_dict['data_hash']
headers = response.headers
response = response.response
if response.status != httplib.CREATED:
raise LibcloudError(
'Unexpected status code, status_code=%s' % (response.status),
driver=self)
server_hash = headers['content-md5']
if server_hash:
server_hash = binascii.hexlify(base64.b64decode(b(server_hash)))
server_hash = server_hash.decode('utf-8')
else:
# TODO: HACK - We could poll the object for a while and get
# the hash
pass
if (verify_hash and server_hash and data_hash != server_hash):
raise ObjectHashMismatchError(
value='MD5 hash checksum does not match',
object_name=object_name, driver=self)
return Object(name=object_name, size=bytes_transferred,
hash=headers['etag'], extra=None,
meta_data=meta_data, container=container,
driver=self)
def ex_set_object_metadata(self, obj, meta_data):
"""
Set metadata for an object
@param obj: The blob object
@type obj: L{Object}
@param meta_data: Metadata key value pairs
@type meta_data: C{dict}
"""
object_path = self._get_object_path(obj.container, obj.name)
params = {'comp': 'metadata'}
headers = {}
self._update_metadata(headers, meta_data)
response = self.connection.request(object_path, method='PUT',
params=params,
headers=headers)
if response.status != httplib.OK:
response.parse_error('Setting metadata')