blob: cf7d771cee72510596cfcae97accf127714cf661 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hmac
import time
import base64
import hashlib
from io import FileIO as file
from libcloud.utils.py3 import b, next, httplib, urlparse, urlquote, urlencode, urlunquote
from libcloud.common.base import XmlResponse, ConnectionUserAndKey
from libcloud.utils.files import read_in_chunks
from libcloud.common.types import LibcloudError
from libcloud.storage.base import CHUNK_SIZE, Object, Container, StorageDriver
from libcloud.storage.types import (
ObjectDoesNotExistError,
ContainerIsNotEmptyError,
ContainerDoesNotExistError,
ContainerAlreadyExistsError,
)
def collapse(s):
return " ".join([x for x in s.split(" ") if x])
class AtmosError(LibcloudError):
def __init__(self, code, message, driver=None):
super().__init__(value=message, driver=driver)
self.code = code
class AtmosResponse(XmlResponse):
def success(self):
return self.status in (
httplib.OK,
httplib.CREATED,
httplib.NO_CONTENT,
httplib.PARTIAL_CONTENT,
)
def parse_error(self):
tree = self.parse_body()
if tree is None:
return None
code = int(tree.find("Code").text)
message = tree.find("Message").text
raise AtmosError(code=code, message=message, driver=self.connection.driver)
class AtmosConnection(ConnectionUserAndKey):
responseCls = AtmosResponse
def add_default_headers(self, headers):
headers["x-emc-uid"] = self.user_id
headers["Date"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime())
headers["x-emc-date"] = headers["Date"]
if "Content-Type" not in headers:
headers["Content-Type"] = "application/octet-stream"
if "Accept" not in headers:
headers["Accept"] = "*/*"
return headers
def pre_connect_hook(self, params, headers):
headers["x-emc-signature"] = self._calculate_signature(params, headers)
return params, headers
def _calculate_signature(self, params, headers):
pathstring = urlunquote(self.action)
driver_path = self.driver.path # pylint: disable=no-member
if pathstring.startswith(driver_path):
pathstring = pathstring[len(driver_path) :]
if params:
if type(params) is dict:
params = list(params.items())
pathstring += "?" + urlencode(params)
pathstring = pathstring.lower()
xhdrs = [(k, v) for k, v in list(headers.items()) if k.startswith("x-emc-")]
xhdrs.sort(key=lambda x: x[0])
signature = [
self.method,
headers.get("Content-Type", ""),
headers.get("Range", ""),
headers.get("Date", ""),
pathstring,
]
signature.extend([k + ":" + collapse(v) for k, v in xhdrs])
signature = "\n".join(signature)
key = base64.b64decode(self.key)
signature = hmac.new(b(key), b(signature), hashlib.sha1).digest()
return base64.b64encode(b(signature)).decode("utf-8")
class AtmosDriver(StorageDriver):
connectionCls = AtmosConnection
host = None # type: str
path = None # type: str
api_name = "atmos"
supports_chunked_encoding = True
website = "http://atmosonline.com/"
name = "atmos"
DEFAULT_CDN_TTL = 60 * 60 * 24 * 7 # 1 week
def __init__(self, key, secret=None, secure=True, host=None, port=None):
host = host or self.host
super().__init__(key, secret, secure, host, port)
def iterate_containers(self):
result = self.connection.request(self._namespace_path(""))
entries = self._list_objects(result.object, object_type="directory")
for entry in entries:
extra = {"object_id": entry["id"]}
yield Container(entry["name"], extra, self)
def get_container(self, container_name):
path = self._namespace_path(container_name) + "/?metadata/system"
try:
result = self.connection.request(path)
except AtmosError as e:
if e.code != 1003:
raise
raise ContainerDoesNotExistError(e, self, container_name)
meta = self._emc_meta(result)
extra = {"object_id": meta["objectid"]}
return Container(container_name, extra, self)
def create_container(self, container_name):
path = self._namespace_path(container_name) + "/"
try:
self.connection.request(path, method="POST")
except AtmosError as e:
if e.code != 1016:
raise
raise ContainerAlreadyExistsError(e, self, container_name)
return self.get_container(container_name)
def delete_container(self, container):
try:
self.connection.request(self._namespace_path(container.name) + "/", method="DELETE")
except AtmosError as e:
if e.code == 1003:
raise ContainerDoesNotExistError(e, self, container.name)
elif e.code == 1023:
raise ContainerIsNotEmptyError(e, self, container.name)
return True
def get_object(self, container_name, object_name):
container = self.get_container(container_name)
object_name_cleaned = self._clean_object_name(object_name)
path = self._namespace_path(container_name) + "/" + object_name_cleaned
try:
result = self.connection.request(path + "?metadata/system")
system_meta = self._emc_meta(result)
result = self.connection.request(path + "?metadata/user")
user_meta = self._emc_meta(result)
except AtmosError as e:
if e.code != 1003:
raise
raise ObjectDoesNotExistError(e, self, object_name)
last_modified = time.strptime(system_meta["mtime"], "%Y-%m-%dT%H:%M:%SZ")
last_modified = time.strftime("%a, %d %b %Y %H:%M:%S GMT", last_modified)
extra = {"object_id": system_meta["objectid"], "last_modified": last_modified}
data_hash = user_meta.pop("md5", "")
return Object(
object_name,
int(system_meta["size"]),
data_hash,
extra,
user_meta,
container,
self,
)
def upload_object(
self,
file_path,
container,
object_name,
extra=None,
verify_hash=True,
headers=None,
):
method = "PUT"
extra = extra or {}
object_name_cleaned = self._clean_object_name(object_name)
request_path = self._namespace_path(container.name) + "/" + object_name_cleaned
content_type = extra.get("content_type", None)
try:
self.connection.request(request_path + "?metadata/system")
except AtmosError as e:
if e.code != 1003:
raise
method = "POST"
result_dict = self._upload_object(
object_name=object_name,
content_type=content_type,
request_path=request_path,
request_method=method,
headers={},
file_path=file_path,
)
bytes_transferred = result_dict["bytes_transferred"]
if extra is None:
meta_data = {}
else:
meta_data = extra.get("meta_data", {})
meta_data["md5"] = result_dict["data_hash"]
user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())])
self.connection.request(
request_path + "?metadata/user",
method="POST",
headers={"x-emc-meta": user_meta},
)
result = self.connection.request(request_path + "?metadata/system")
meta = self._emc_meta(result)
del meta_data["md5"]
extra = {
"object_id": meta["objectid"],
"meta_data": meta_data,
}
return Object(
object_name,
bytes_transferred,
result_dict["data_hash"],
extra,
meta_data,
container,
self,
)
def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None):
if isinstance(iterator, file):
iterator = iter(iterator)
extra_headers = headers or {}
data_hash = hashlib.md5()
generator = read_in_chunks(iterator, CHUNK_SIZE, True)
bytes_transferred = 0
try:
chunk = next(generator)
except StopIteration:
chunk = ""
path = self._namespace_path(container.name + "/" + object_name)
method = "PUT"
if extra is not None:
content_type = extra.get("content_type", None)
else:
content_type = None
content_type = self._determine_content_type(content_type, object_name)
try:
self.connection.request(path + "?metadata/system")
except AtmosError as e:
if e.code != 1003:
raise
method = "POST"
while True:
end = bytes_transferred + len(chunk) - 1
data_hash.update(b(chunk))
headers = dict(extra_headers)
headers.update(
{
"x-emc-meta": "md5=" + data_hash.hexdigest(),
"Content-Type": content_type,
}
)
if len(chunk) > 0 and bytes_transferred > 0:
headers["Range"] = "Bytes=%d-%d" % (bytes_transferred, end)
method = "PUT"
result = self.connection.request(path, method=method, data=chunk, headers=headers)
bytes_transferred += len(chunk)
try:
chunk = next(generator)
except StopIteration:
break
if len(chunk) == 0:
break
data_hash = data_hash.hexdigest()
if extra is None:
meta_data = {}
else:
meta_data = extra.get("meta_data", {})
meta_data["md5"] = data_hash
user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())])
self.connection.request(
path + "?metadata/user", method="POST", headers={"x-emc-meta": user_meta}
)
result = self.connection.request(path + "?metadata/system")
meta = self._emc_meta(result)
extra = {
"object_id": meta["objectid"],
"meta_data": meta_data,
}
return Object(object_name, bytes_transferred, data_hash, extra, meta_data, container, self)
def download_object(
self, obj, destination_path, overwrite_existing=False, delete_on_failure=True
):
path = self._namespace_path(obj.container.name + "/" + obj.name)
response = self.connection.request(path, method="GET", raw=True)
return self._get_object(
obj=obj,
callback=self._save_object,
response=response,
callback_kwargs={
"obj": obj,
"response": response.response,
"destination_path": destination_path,
"overwrite_existing": overwrite_existing,
"delete_on_failure": delete_on_failure,
},
success_status_code=httplib.OK,
)
def download_object_as_stream(self, obj, chunk_size=None):
path = self._namespace_path(obj.container.name + "/" + obj.name)
response = self.connection.request(path, method="GET", raw=True)
return self._get_object(
obj=obj,
callback=read_in_chunks,
response=response,
callback_kwargs={"iterator": response.response, "chunk_size": chunk_size},
success_status_code=httplib.OK,
)
def delete_object(self, obj):
path = self._namespace_path(obj.container.name) + "/" + self._clean_object_name(obj.name)
try:
self.connection.request(path, method="DELETE")
except AtmosError as e:
if e.code != 1003:
raise
raise ObjectDoesNotExistError(e, self, obj.name)
return True
def enable_object_cdn(self, obj):
return True
def get_object_cdn_url(self, obj, expiry=None, use_object=False):
"""
Return an object CDN URL.
:param obj: Object instance
:type obj: :class:`Object`
:param expiry: Expiry
:type expiry: ``str``
:param use_object: Use object
:type use_object: ``bool``
:rtype: ``str``
"""
if use_object:
path = "/rest/objects" + obj.meta_data["object_id"]
else:
path = "/rest/namespace/" + obj.container.name + "/" + obj.name
if self.secure:
protocol = "https"
else:
protocol = "http"
expiry = str(expiry or int(time.time()) + self.DEFAULT_CDN_TTL)
params = [
("uid", self.key),
("expires", expiry),
]
params.append(("signature", self._cdn_signature(path, params, expiry)))
params = urlencode(params)
path = self.path + path
return urlparse.urlunparse((protocol, self.host, path, "", params, ""))
def _cdn_signature(self, path, params, expiry):
key = base64.b64decode(self.secret)
signature = "\n".join(["GET", path.lower(), self.key, expiry])
signature = hmac.new(key, signature, hashlib.sha1).digest()
return base64.b64encode(signature)
def _list_objects(self, tree, object_type=None):
listing = tree.find(self._emc_tag("DirectoryList"))
entries = []
for entry in listing.findall(self._emc_tag("DirectoryEntry")):
file_type = entry.find(self._emc_tag("FileType")).text
if object_type is not None and object_type != file_type:
continue
entries.append(
{
"id": entry.find(self._emc_tag("ObjectID")).text,
"type": file_type,
"name": entry.find(self._emc_tag("Filename")).text,
}
)
return entries
def _clean_object_name(self, name):
return urlquote(name.encode("ascii"))
def _namespace_path(self, path):
return self.path + "/rest/namespace/" + urlquote(path.encode("ascii"))
def _object_path(self, object_id):
return self.path + "/rest/objects/" + object_id.encode("ascii")
@staticmethod
def _emc_tag(tag):
return "{http://www.emc.com/cos/}" + tag
def _emc_meta(self, response):
meta = response.headers.get("x-emc-meta", "")
if len(meta) == 0:
return {}
meta = meta.split(", ")
return dict([x.split("=", 1) for x in meta])
def _entries_to_objects(self, container, entries):
for entry in entries:
metadata = {"object_id": entry["id"]}
yield Object(entry["name"], 0, "", {}, metadata, container, self)
def iterate_container_objects(self, container, prefix=None, ex_prefix=None):
"""
Return a generator of objects for the given container.
:param container: Container instance
:type container: :class:`Container`
:param prefix: Filter objects starting with a prefix.
Filtering is performed client-side.
:type prefix: ``str``
:param ex_prefix: (Deprecated.) Filter objects starting with a prefix.
Filtering is performed client-side.
:type ex_prefix: ``str``
:return: A generator of Object instances.
:rtype: ``generator`` of :class:`Object`
"""
prefix = self._normalize_prefix_argument(prefix, ex_prefix)
headers = {"x-emc-include-meta": "1"}
path = self._namespace_path(container.name) + "/"
result = self.connection.request(path, headers=headers)
entries = self._list_objects(result.object, object_type="regular")
objects = self._entries_to_objects(container, entries)
return self._filter_listed_container_objects(objects, prefix)