| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import hmac |
| import hashlib |
| from datetime import datetime |
| |
| from libcloud.utils.py3 import urlquote |
| |
| __all__ = [ |
| "OSCRequestSignerAlgorithmV4", |
| ] |
| |
| |
| class OSCRequestSigner: |
| """ |
| Class which handles signing the outgoing AWS requests. |
| """ |
| |
| def __init__(self, access_key: str, access_secret: str, version: str, connection): |
| """ |
| :param access_key: Access key. |
| :type access_key: ``str`` |
| |
| :param access_secret: Access secret. |
| :type access_secret: ``str`` |
| |
| :param version: API version. |
| :type version: ``str`` |
| |
| :param connection: Connection instance. |
| :type connection: :class:`Connection` |
| """ |
| self.access_key = access_key |
| self.access_secret = access_secret |
| self.version = version |
| self.connection = connection |
| |
| |
| class OSCRequestSignerAlgorithmV4(OSCRequestSigner): |
| @staticmethod |
| def sign(key, msg): |
| return hmac.new(key, msg.encode("utf-8"), hashlib.sha256).digest() |
| |
| @staticmethod |
| def _get_signed_headers(headers: dict): |
| return ";".join([k.lower() for k in sorted(headers.keys())]) |
| |
| @staticmethod |
| def _get_canonical_headers(headers: dict): |
| return ( |
| "\n".join([":".join([k.lower(), str(v).strip()]) for k, v in sorted(headers.items())]) |
| + "\n" |
| ) |
| |
| @staticmethod |
| def _get_request_params(params: dict): |
| return "&".join( |
| [ |
| "{}={}".format(urlquote(k, safe=""), urlquote(str(v), safe="~")) |
| for k, v in sorted(params.items()) |
| ] |
| ) |
| |
| def get_request_headers(self, service_name: str, region: str, action: str, data: str): |
| date = datetime.utcnow() |
| host = "{}.{}.outscale.com".format(service_name, region) |
| headers = { |
| "Content-Type": "application/json; charset=utf-8", |
| "X-Osc-Date": date.strftime("%Y%m%dT%H%M%SZ"), |
| "Host": host, |
| } |
| path = "/{}/{}/{}".format(self.connection.service_name, self.version, action) |
| sig = self._get_authorization_v4_header( |
| headers=headers, dt=date, method="POST", path=path, data=data |
| ) |
| headers.update({"Authorization": sig}) |
| return headers |
| |
| def _get_authorization_v4_header( |
| self, |
| headers: dict, |
| data: str, |
| dt: datetime, |
| method: str = "GET", |
| path: str = "/", |
| ): |
| credentials_scope = self._get_credential_scope(dt=dt) |
| signed_headers = self._get_signed_headers(headers=headers) |
| signature = self._get_signature(headers=headers, dt=dt, method=method, path=path, data=data) |
| return ( |
| "OSC4-HMAC-SHA256 Credential=%(u)s/%(c)s, " |
| "SignedHeaders=%(sh)s, Signature=%(s)s" |
| % { |
| "u": self.access_key, |
| "c": credentials_scope, |
| "sh": signed_headers, |
| "s": signature, |
| } |
| ) |
| |
| def _get_signature(self, headers: dict, dt: datetime, method: str, path: str, data: str): |
| string_to_sign = self._get_string_to_sign( |
| headers=headers, dt=dt, method=method, path=path, data=data |
| ) |
| signing_key = self._get_key_to_sign_with(self.access_secret, dt.strftime("%Y%m%d")) |
| return hmac.new(signing_key, string_to_sign.encode("utf-8"), hashlib.sha256).hexdigest() |
| |
| def _get_key_to_sign_with(self, key: str, dt: str): |
| k_date = self.sign(("OSC4" + key).encode("utf-8"), dt) |
| k_region = self.sign(k_date, self.connection.region_name) |
| k_service = self.sign(k_region, self.connection.service_name) |
| return self.sign(k_service, "osc4_request") |
| |
| def _get_string_to_sign(self, headers: dict, dt: datetime, method: str, path: str, data: str): |
| canonical_request = self._get_canonical_request( |
| headers=headers, method=method, path=path, data=data |
| ) |
| return ( |
| "OSC4-HMAC-SHA256" |
| + "\n" |
| + dt.strftime("%Y%m%dT%H%M%SZ") |
| + "\n" |
| + self._get_credential_scope(dt) |
| + "\n" |
| + hashlib.sha256(canonical_request.encode("utf-8")).hexdigest() |
| ) |
| |
| def _get_credential_scope(self, dt): |
| return "/".join( |
| [ |
| dt.strftime("%Y%m%d"), |
| self.connection.region_name, |
| self.connection.service_name, |
| "osc4_request", |
| ] |
| ) |
| |
| def _get_canonical_request(self, headers, method, path, data): |
| return "\n".join( |
| [ |
| method, |
| path, |
| self._get_request_params({}), |
| self._get_canonical_headers(headers), |
| self._get_signed_headers(headers), |
| hashlib.sha256(data.encode("utf-8")).hexdigest(), |
| ] |
| ) |