| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import hmac |
| import time |
| import base64 |
| import hashlib |
| from io import FileIO as file |
| |
| from libcloud.utils.py3 import b, next, httplib, urlparse, urlquote, urlencode, urlunquote |
| from libcloud.common.base import XmlResponse, ConnectionUserAndKey |
| from libcloud.utils.files import read_in_chunks |
| from libcloud.common.types import LibcloudError |
| from libcloud.storage.base import CHUNK_SIZE, Object, Container, StorageDriver |
| from libcloud.storage.types import ( |
| ObjectDoesNotExistError, |
| ContainerIsNotEmptyError, |
| ContainerDoesNotExistError, |
| ContainerAlreadyExistsError, |
| ) |
| |
| |
| def collapse(s): |
| return " ".join([x for x in s.split(" ") if x]) |
| |
| |
| class AtmosError(LibcloudError): |
| def __init__(self, code, message, driver=None): |
| super().__init__(value=message, driver=driver) |
| self.code = code |
| |
| |
| class AtmosResponse(XmlResponse): |
| def success(self): |
| return self.status in ( |
| httplib.OK, |
| httplib.CREATED, |
| httplib.NO_CONTENT, |
| httplib.PARTIAL_CONTENT, |
| ) |
| |
| def parse_error(self): |
| tree = self.parse_body() |
| |
| if tree is None: |
| return None |
| |
| code = int(tree.find("Code").text) |
| message = tree.find("Message").text |
| raise AtmosError(code=code, message=message, driver=self.connection.driver) |
| |
| |
| class AtmosConnection(ConnectionUserAndKey): |
| responseCls = AtmosResponse |
| |
| def add_default_headers(self, headers): |
| headers["x-emc-uid"] = self.user_id |
| headers["Date"] = time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()) |
| headers["x-emc-date"] = headers["Date"] |
| |
| if "Content-Type" not in headers: |
| headers["Content-Type"] = "application/octet-stream" |
| if "Accept" not in headers: |
| headers["Accept"] = "*/*" |
| |
| return headers |
| |
| def pre_connect_hook(self, params, headers): |
| headers["x-emc-signature"] = self._calculate_signature(params, headers) |
| |
| return params, headers |
| |
| def _calculate_signature(self, params, headers): |
| pathstring = urlunquote(self.action) |
| driver_path = self.driver.path # pylint: disable=no-member |
| if pathstring.startswith(driver_path): |
| pathstring = pathstring[len(driver_path) :] |
| if params: |
| if type(params) is dict: |
| params = list(params.items()) |
| pathstring += "?" + urlencode(params) |
| pathstring = pathstring.lower() |
| |
| xhdrs = [(k, v) for k, v in list(headers.items()) if k.startswith("x-emc-")] |
| xhdrs.sort(key=lambda x: x[0]) |
| |
| signature = [ |
| self.method, |
| headers.get("Content-Type", ""), |
| headers.get("Range", ""), |
| headers.get("Date", ""), |
| pathstring, |
| ] |
| signature.extend([k + ":" + collapse(v) for k, v in xhdrs]) |
| signature = "\n".join(signature) |
| key = base64.b64decode(self.key) |
| signature = hmac.new(b(key), b(signature), hashlib.sha1).digest() |
| return base64.b64encode(b(signature)).decode("utf-8") |
| |
| |
| class AtmosDriver(StorageDriver): |
| connectionCls = AtmosConnection |
| |
| host = None # type: str |
| path = None # type: str |
| api_name = "atmos" |
| supports_chunked_encoding = True |
| website = "http://atmosonline.com/" |
| name = "atmos" |
| |
| DEFAULT_CDN_TTL = 60 * 60 * 24 * 7 # 1 week |
| |
| def __init__(self, key, secret=None, secure=True, host=None, port=None): |
| host = host or self.host |
| super().__init__(key, secret, secure, host, port) |
| |
| def iterate_containers(self): |
| result = self.connection.request(self._namespace_path("")) |
| entries = self._list_objects(result.object, object_type="directory") |
| for entry in entries: |
| extra = {"object_id": entry["id"]} |
| yield Container(entry["name"], extra, self) |
| |
| def get_container(self, container_name): |
| path = self._namespace_path(container_name) + "/?metadata/system" |
| try: |
| result = self.connection.request(path) |
| except AtmosError as e: |
| if e.code != 1003: |
| raise |
| raise ContainerDoesNotExistError(e, self, container_name) |
| meta = self._emc_meta(result) |
| extra = {"object_id": meta["objectid"]} |
| return Container(container_name, extra, self) |
| |
| def create_container(self, container_name): |
| path = self._namespace_path(container_name) + "/" |
| try: |
| self.connection.request(path, method="POST") |
| except AtmosError as e: |
| if e.code != 1016: |
| raise |
| raise ContainerAlreadyExistsError(e, self, container_name) |
| return self.get_container(container_name) |
| |
| def delete_container(self, container): |
| try: |
| self.connection.request(self._namespace_path(container.name) + "/", method="DELETE") |
| except AtmosError as e: |
| if e.code == 1003: |
| raise ContainerDoesNotExistError(e, self, container.name) |
| elif e.code == 1023: |
| raise ContainerIsNotEmptyError(e, self, container.name) |
| return True |
| |
| def get_object(self, container_name, object_name): |
| container = self.get_container(container_name) |
| object_name_cleaned = self._clean_object_name(object_name) |
| path = self._namespace_path(container_name) + "/" + object_name_cleaned |
| |
| try: |
| result = self.connection.request(path + "?metadata/system") |
| system_meta = self._emc_meta(result) |
| |
| result = self.connection.request(path + "?metadata/user") |
| user_meta = self._emc_meta(result) |
| except AtmosError as e: |
| if e.code != 1003: |
| raise |
| raise ObjectDoesNotExistError(e, self, object_name) |
| |
| last_modified = time.strptime(system_meta["mtime"], "%Y-%m-%dT%H:%M:%SZ") |
| last_modified = time.strftime("%a, %d %b %Y %H:%M:%S GMT", last_modified) |
| extra = {"object_id": system_meta["objectid"], "last_modified": last_modified} |
| data_hash = user_meta.pop("md5", "") |
| return Object( |
| object_name, |
| int(system_meta["size"]), |
| data_hash, |
| extra, |
| user_meta, |
| container, |
| self, |
| ) |
| |
| def upload_object( |
| self, |
| file_path, |
| container, |
| object_name, |
| extra=None, |
| verify_hash=True, |
| headers=None, |
| ): |
| method = "PUT" |
| |
| extra = extra or {} |
| object_name_cleaned = self._clean_object_name(object_name) |
| request_path = self._namespace_path(container.name) + "/" + object_name_cleaned |
| content_type = extra.get("content_type", None) |
| |
| try: |
| self.connection.request(request_path + "?metadata/system") |
| except AtmosError as e: |
| if e.code != 1003: |
| raise |
| method = "POST" |
| |
| result_dict = self._upload_object( |
| object_name=object_name, |
| content_type=content_type, |
| request_path=request_path, |
| request_method=method, |
| headers={}, |
| file_path=file_path, |
| ) |
| |
| bytes_transferred = result_dict["bytes_transferred"] |
| |
| if extra is None: |
| meta_data = {} |
| else: |
| meta_data = extra.get("meta_data", {}) |
| meta_data["md5"] = result_dict["data_hash"] |
| user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())]) |
| self.connection.request( |
| request_path + "?metadata/user", |
| method="POST", |
| headers={"x-emc-meta": user_meta}, |
| ) |
| result = self.connection.request(request_path + "?metadata/system") |
| meta = self._emc_meta(result) |
| del meta_data["md5"] |
| extra = { |
| "object_id": meta["objectid"], |
| "meta_data": meta_data, |
| } |
| |
| return Object( |
| object_name, |
| bytes_transferred, |
| result_dict["data_hash"], |
| extra, |
| meta_data, |
| container, |
| self, |
| ) |
| |
| def upload_object_via_stream(self, iterator, container, object_name, extra=None, headers=None): |
| if isinstance(iterator, file): |
| iterator = iter(iterator) |
| |
| extra_headers = headers or {} |
| data_hash = hashlib.md5() |
| generator = read_in_chunks(iterator, CHUNK_SIZE, True) |
| bytes_transferred = 0 |
| try: |
| chunk = next(generator) |
| except StopIteration: |
| chunk = "" |
| |
| path = self._namespace_path(container.name + "/" + object_name) |
| method = "PUT" |
| |
| if extra is not None: |
| content_type = extra.get("content_type", None) |
| else: |
| content_type = None |
| |
| content_type = self._determine_content_type(content_type, object_name) |
| |
| try: |
| self.connection.request(path + "?metadata/system") |
| except AtmosError as e: |
| if e.code != 1003: |
| raise |
| method = "POST" |
| |
| while True: |
| end = bytes_transferred + len(chunk) - 1 |
| data_hash.update(b(chunk)) |
| headers = dict(extra_headers) |
| |
| headers.update( |
| { |
| "x-emc-meta": "md5=" + data_hash.hexdigest(), |
| "Content-Type": content_type, |
| } |
| ) |
| |
| if len(chunk) > 0 and bytes_transferred > 0: |
| headers["Range"] = "Bytes=%d-%d" % (bytes_transferred, end) |
| method = "PUT" |
| |
| result = self.connection.request(path, method=method, data=chunk, headers=headers) |
| bytes_transferred += len(chunk) |
| |
| try: |
| chunk = next(generator) |
| except StopIteration: |
| break |
| if len(chunk) == 0: |
| break |
| |
| data_hash = data_hash.hexdigest() |
| |
| if extra is None: |
| meta_data = {} |
| else: |
| meta_data = extra.get("meta_data", {}) |
| meta_data["md5"] = data_hash |
| user_meta = ", ".join([k + "=" + str(v) for k, v in list(meta_data.items())]) |
| self.connection.request( |
| path + "?metadata/user", method="POST", headers={"x-emc-meta": user_meta} |
| ) |
| |
| result = self.connection.request(path + "?metadata/system") |
| |
| meta = self._emc_meta(result) |
| extra = { |
| "object_id": meta["objectid"], |
| "meta_data": meta_data, |
| } |
| |
| return Object(object_name, bytes_transferred, data_hash, extra, meta_data, container, self) |
| |
| def download_object( |
| self, obj, destination_path, overwrite_existing=False, delete_on_failure=True |
| ): |
| path = self._namespace_path(obj.container.name + "/" + obj.name) |
| response = self.connection.request(path, method="GET", raw=True) |
| |
| return self._get_object( |
| obj=obj, |
| callback=self._save_object, |
| response=response, |
| callback_kwargs={ |
| "obj": obj, |
| "response": response.response, |
| "destination_path": destination_path, |
| "overwrite_existing": overwrite_existing, |
| "delete_on_failure": delete_on_failure, |
| }, |
| success_status_code=httplib.OK, |
| ) |
| |
| def download_object_as_stream(self, obj, chunk_size=None): |
| path = self._namespace_path(obj.container.name + "/" + obj.name) |
| response = self.connection.request(path, method="GET", raw=True) |
| |
| return self._get_object( |
| obj=obj, |
| callback=read_in_chunks, |
| response=response, |
| callback_kwargs={"iterator": response.response, "chunk_size": chunk_size}, |
| success_status_code=httplib.OK, |
| ) |
| |
| def delete_object(self, obj): |
| path = self._namespace_path(obj.container.name) + "/" + self._clean_object_name(obj.name) |
| try: |
| self.connection.request(path, method="DELETE") |
| except AtmosError as e: |
| if e.code != 1003: |
| raise |
| raise ObjectDoesNotExistError(e, self, obj.name) |
| return True |
| |
| def enable_object_cdn(self, obj): |
| return True |
| |
| def get_object_cdn_url(self, obj, expiry=None, use_object=False): |
| """ |
| Return an object CDN URL. |
| |
| :param obj: Object instance |
| :type obj: :class:`Object` |
| |
| :param expiry: Expiry |
| :type expiry: ``str`` |
| |
| :param use_object: Use object |
| :type use_object: ``bool`` |
| |
| :rtype: ``str`` |
| """ |
| if use_object: |
| path = "/rest/objects" + obj.meta_data["object_id"] |
| else: |
| path = "/rest/namespace/" + obj.container.name + "/" + obj.name |
| |
| if self.secure: |
| protocol = "https" |
| else: |
| protocol = "http" |
| |
| expiry = str(expiry or int(time.time()) + self.DEFAULT_CDN_TTL) |
| params = [ |
| ("uid", self.key), |
| ("expires", expiry), |
| ] |
| params.append(("signature", self._cdn_signature(path, params, expiry))) |
| |
| params = urlencode(params) |
| path = self.path + path |
| return urlparse.urlunparse((protocol, self.host, path, "", params, "")) |
| |
| def _cdn_signature(self, path, params, expiry): |
| key = base64.b64decode(self.secret) |
| signature = "\n".join(["GET", path.lower(), self.key, expiry]) |
| signature = hmac.new(key, signature, hashlib.sha1).digest() |
| |
| return base64.b64encode(signature) |
| |
| def _list_objects(self, tree, object_type=None): |
| listing = tree.find(self._emc_tag("DirectoryList")) |
| entries = [] |
| for entry in listing.findall(self._emc_tag("DirectoryEntry")): |
| file_type = entry.find(self._emc_tag("FileType")).text |
| if object_type is not None and object_type != file_type: |
| continue |
| entries.append( |
| { |
| "id": entry.find(self._emc_tag("ObjectID")).text, |
| "type": file_type, |
| "name": entry.find(self._emc_tag("Filename")).text, |
| } |
| ) |
| return entries |
| |
| def _clean_object_name(self, name): |
| return urlquote(name.encode("ascii")) |
| |
| def _namespace_path(self, path): |
| return self.path + "/rest/namespace/" + urlquote(path.encode("ascii")) |
| |
| def _object_path(self, object_id): |
| return self.path + "/rest/objects/" + object_id.encode("ascii") |
| |
| @staticmethod |
| def _emc_tag(tag): |
| return "{http://www.emc.com/cos/}" + tag |
| |
| def _emc_meta(self, response): |
| meta = response.headers.get("x-emc-meta", "") |
| if len(meta) == 0: |
| return {} |
| meta = meta.split(", ") |
| return dict([x.split("=", 1) for x in meta]) |
| |
| def _entries_to_objects(self, container, entries): |
| for entry in entries: |
| metadata = {"object_id": entry["id"]} |
| yield Object(entry["name"], 0, "", {}, metadata, container, self) |
| |
| def iterate_container_objects(self, container, prefix=None, ex_prefix=None): |
| """ |
| Return a generator of objects for the given container. |
| |
| :param container: Container instance |
| :type container: :class:`Container` |
| |
| :param prefix: Filter objects starting with a prefix. |
| Filtering is performed client-side. |
| :type prefix: ``str`` |
| |
| :param ex_prefix: (Deprecated.) Filter objects starting with a prefix. |
| Filtering is performed client-side. |
| :type ex_prefix: ``str`` |
| |
| :return: A generator of Object instances. |
| :rtype: ``generator`` of :class:`Object` |
| """ |
| prefix = self._normalize_prefix_argument(prefix, ex_prefix) |
| |
| headers = {"x-emc-include-meta": "1"} |
| path = self._namespace_path(container.name) + "/" |
| result = self.connection.request(path, headers=headers) |
| entries = self._list_objects(result.object, object_type="regular") |
| objects = self._entries_to_objects(container, entries) |
| return self._filter_listed_container_objects(objects, prefix) |