| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import base64 |
| import hashlib |
| import hmac |
| import uuid |
| from abc import ABC, abstractmethod |
| from collections import OrderedDict |
| from datetime import datetime |
| from typing import Dict, Optional |
| from urllib.parse import unquote |
| |
| from pypaimon.api.token_loader import DLFToken |
| from pypaimon.api.typedef import RESTAuthParameter |
| |
| |
| class DLFRequestSigner(ABC): |
| """ |
| Interface for DLF request signers. Different signers implement different |
| signature algorithms (e.g., DLF4-HMAC-SHA256, ROA v2 HMAC-SHA1). |
| """ |
| |
| @abstractmethod |
| def sign_headers( |
| self, |
| body: Optional[str], |
| now: datetime, |
| security_token: Optional[str], |
| host: str |
| ) -> Dict[str, str]: |
| """ |
| Generate signature headers for the request. |
| |
| Args: |
| body: request body (can be None for GET requests) |
| now: current timestamp |
| security_token: security token (can be None) |
| host: request host |
| |
| Returns: |
| map of signature-related headers |
| """ |
| pass |
| |
| @abstractmethod |
| def authorization( |
| self, |
| rest_auth_parameter: RESTAuthParameter, |
| token: DLFToken, |
| host: str, |
| sign_headers: Dict[str, str] |
| ) -> str: |
| """ |
| Generate the Authorization header value. |
| |
| Args: |
| rest_auth_parameter: request parameters (method, path, query, body) |
| token: DLF token (access key id, secret, security token) |
| host: request host |
| sign_headers: headers generated by sign_headers() |
| |
| Returns: |
| Authorization header value |
| """ |
| pass |
| |
| @abstractmethod |
| def identifier(self) -> str: |
| """ |
| Get the identifier for this signer (e.g., "default", "openapi"). |
| |
| Returns: |
| signer identifier |
| """ |
| pass |
| |
| |
| class DLFDefaultSigner(DLFRequestSigner): |
| """ |
| Default DLF signer using DLF4-HMAC-SHA256 algorithm. |
| Used for VPC endpoints (e.g., cn-hangzhou-vpc.dlf.aliyuncs.com). |
| """ |
| |
| IDENTIFIER = "default" |
| VERSION = "v1" |
| SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256" |
| PRODUCT = "DlfNext" |
| REQUEST_TYPE = "aliyun_v4_request" |
| SIGNATURE_KEY = "Signature" |
| NEW_LINE = "\n" |
| |
| # Header keys |
| DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5" |
| DLF_CONTENT_TYPE_KEY = "Content-Type" |
| DLF_DATE_HEADER_KEY = "x-dlf-date" |
| DLF_SECURITY_TOKEN_HEADER_KEY = "x-dlf-security-token" |
| DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version" |
| DLF_CONTENT_SHA56_HEADER_KEY = "x-dlf-content-sha256" |
| DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD" |
| |
| AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ" |
| MEDIA_TYPE = "application/json" |
| |
| SIGNED_HEADERS = [ |
| DLF_CONTENT_MD5_HEADER_KEY.lower(), |
| DLF_CONTENT_TYPE_KEY.lower(), |
| DLF_CONTENT_SHA56_HEADER_KEY.lower(), |
| DLF_DATE_HEADER_KEY.lower(), |
| DLF_AUTH_VERSION_HEADER_KEY.lower(), |
| DLF_SECURITY_TOKEN_HEADER_KEY.lower(), |
| ] |
| |
| def __init__(self, region: str): |
| self.region = region |
| |
| def sign_headers( |
| self, |
| body: Optional[str], |
| now: datetime, |
| security_token: Optional[str], |
| host: str |
| ) -> Dict[str, str]: |
| date_time = now.strftime(self.AUTH_DATE_TIME_FORMAT) |
| sign_headers = {} |
| |
| sign_headers[self.DLF_DATE_HEADER_KEY] = date_time |
| sign_headers[self.DLF_CONTENT_SHA56_HEADER_KEY] = self.DLF_CONTENT_SHA56_VALUE |
| sign_headers[self.DLF_AUTH_VERSION_HEADER_KEY] = self.VERSION |
| |
| if body is not None and body != "": |
| sign_headers[self.DLF_CONTENT_TYPE_KEY] = self.MEDIA_TYPE |
| sign_headers[self.DLF_CONTENT_MD5_HEADER_KEY] = self._md5(body) |
| |
| if security_token is not None: |
| sign_headers[self.DLF_SECURITY_TOKEN_HEADER_KEY] = security_token |
| |
| return sign_headers |
| |
| def authorization( |
| self, |
| rest_auth_parameter: RESTAuthParameter, |
| token: DLFToken, |
| host: str, |
| sign_headers: Dict[str, str] |
| ) -> str: |
| try: |
| date_time = sign_headers.get(self.DLF_DATE_HEADER_KEY) |
| date = date_time[:8] |
| |
| canonical_request = self._get_canonical_request(rest_auth_parameter, sign_headers) |
| |
| string_to_sign = self.NEW_LINE.join([ |
| self.SIGNATURE_ALGORITHM, |
| date_time, |
| f"{date}/{self.region}/{self.PRODUCT}/{self.REQUEST_TYPE}", |
| self._sha256_hex(canonical_request), |
| ]) |
| |
| date_key = self._hmac_sha256( |
| f"aliyun_v4{token.access_key_secret}".encode("utf-8"), date |
| ) |
| date_region_key = self._hmac_sha256(date_key, self.region) |
| date_region_service_key = self._hmac_sha256(date_region_key, self.PRODUCT) |
| signing_key = self._hmac_sha256(date_region_service_key, self.REQUEST_TYPE) |
| |
| result = self._hmac_sha256(signing_key, string_to_sign) |
| signature = self._hex_encode(result) |
| |
| credential = (f"{self.SIGNATURE_ALGORITHM} " |
| f"Credential={token.access_key_id}/{date}/{self.region}/{self.PRODUCT}/{self.REQUEST_TYPE}") |
| signature_part = f"{self.SIGNATURE_KEY}={signature}" |
| |
| return f"{credential},{signature_part}" |
| |
| except Exception as e: |
| raise RuntimeError(f"Failed to generate authorization: {e}") |
| |
| def identifier(self) -> str: |
| return self.IDENTIFIER |
| |
| def _get_canonical_request( |
| self, rest_auth_parameter: RESTAuthParameter, headers: Dict[str, str] |
| ) -> str: |
| canonical_request = self.NEW_LINE.join( |
| [rest_auth_parameter.method, rest_auth_parameter.path] |
| ) |
| |
| canonical_query_string = self._build_canonical_query_string( |
| rest_auth_parameter.parameters |
| ) |
| canonical_request = self.NEW_LINE.join( |
| [canonical_request, canonical_query_string] |
| ) |
| |
| sorted_signed_headers_map = self._build_sorted_signed_headers_map(headers) |
| for key, value in sorted_signed_headers_map.items(): |
| canonical_request = self.NEW_LINE.join( |
| [canonical_request, f"{key}:{value}"]) |
| |
| content_sha256 = headers.get( |
| self.DLF_CONTENT_SHA56_HEADER_KEY, |
| self.DLF_CONTENT_SHA56_VALUE, |
| ) |
| |
| return self.NEW_LINE.join([canonical_request, content_sha256]) |
| |
| def _build_canonical_query_string(self, parameters: Optional[Dict[str, str]]) -> str: |
| if not parameters: |
| return "" |
| |
| sorted_params = OrderedDict(sorted(parameters.items())) |
| |
| query_parts = [] |
| for key, value in sorted_params.items(): |
| key = self._trim(key) |
| if value is not None and value != "": |
| value = self._trim(value) |
| query_parts.append(f"{key}={value}") |
| else: |
| query_parts.append(key) |
| |
| return "&".join(query_parts) |
| |
| def _build_sorted_signed_headers_map( |
| self, headers: Optional[Dict[str, str]] |
| ) -> OrderedDict: |
| sorted_headers = OrderedDict() |
| |
| if headers: |
| for key, value in headers.items(): |
| lower_key = key.lower() |
| if lower_key in self.SIGNED_HEADERS: |
| sorted_headers[lower_key] = self._trim(value) |
| |
| return OrderedDict(sorted(sorted_headers.items())) |
| |
| @staticmethod |
| def _md5(raw: str) -> str: |
| try: |
| md5_hash = hashlib.md5(raw.encode("utf-8")).digest() |
| return base64.b64encode(md5_hash).decode("utf-8") |
| except Exception as e: |
| raise RuntimeError(f"Failed to calculate MD5: {e}") |
| |
| @staticmethod |
| def _hmac_sha256(key: bytes, data: str) -> bytes: |
| try: |
| return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest() |
| except Exception as e: |
| raise RuntimeError(f"Failed to calculate HMAC-SHA256: {e}") |
| |
| @staticmethod |
| def _sha256_hex(raw: str) -> str: |
| try: |
| sha256_hash = hashlib.sha256(raw.encode("utf-8")).digest() |
| return DLFDefaultSigner._hex_encode(sha256_hash) |
| except Exception as e: |
| raise RuntimeError(f"Failed to calculate SHA256: {e}") |
| |
| @staticmethod |
| def _hex_encode(raw: bytes) -> str: |
| if raw is None: |
| return None |
| return raw.hex() |
| |
| @staticmethod |
| def _trim(value: str) -> str: |
| return value.strip() if value else "" |
| |
| |
| class DLFOpenApiSigner(DLFRequestSigner): |
| """ |
| DLF OpenAPI signer using HMAC-SHA1 algorithm (Alibaba Cloud ROA v2 style). |
| Used for public network access through dlfnext endpoints |
| (e.g., dlfnext.cn-hangzhou.aliyuncs.com). |
| """ |
| |
| IDENTIFIER = "openapi" |
| |
| # Header constants |
| DATE_HEADER = "Date" |
| ACCEPT_HEADER = "Accept" |
| CONTENT_MD5_HEADER = "Content-MD5" |
| CONTENT_TYPE_HEADER = "Content-Type" |
| HOST_HEADER = "Host" |
| X_ACS_SIGNATURE_METHOD = "x-acs-signature-method" |
| X_ACS_SIGNATURE_NONCE = "x-acs-signature-nonce" |
| X_ACS_SIGNATURE_VERSION = "x-acs-signature-version" |
| X_ACS_VERSION = "x-acs-version" |
| X_ACS_SECURITY_TOKEN = "x-acs-security-token" |
| |
| # Values |
| DATE_FORMAT = "%a, %d %b %Y %H:%M:%S GMT" |
| ACCEPT_VALUE = "application/json" |
| CONTENT_TYPE_VALUE = "application/json" |
| SIGNATURE_METHOD_VALUE = "HMAC-SHA1" |
| SIGNATURE_VERSION_VALUE = "1.0" |
| API_VERSION = "2026-01-18" |
| HMAC_SHA1 = "sha1" |
| |
| def sign_headers( |
| self, |
| body: Optional[str], |
| now: datetime, |
| security_token: Optional[str], |
| host: str |
| ) -> Dict[str, str]: |
| headers = {} |
| |
| # Date header in RFC 1123 format |
| headers[self.DATE_HEADER] = now.strftime(self.DATE_FORMAT) |
| |
| # Accept header |
| headers[self.ACCEPT_HEADER] = self.ACCEPT_VALUE |
| |
| # Content-MD5 (if body exists) |
| if body is not None and body != "": |
| try: |
| headers[self.CONTENT_MD5_HEADER] = self._md5_base64(body) |
| headers[self.CONTENT_TYPE_HEADER] = self.CONTENT_TYPE_VALUE |
| except Exception as e: |
| raise RuntimeError(f"Failed to calculate Content-MD5: {e}") |
| |
| # Host header |
| headers[self.HOST_HEADER] = host |
| |
| # x-acs-* headers |
| headers[self.X_ACS_SIGNATURE_METHOD] = self.SIGNATURE_METHOD_VALUE |
| headers[self.X_ACS_SIGNATURE_NONCE] = str(uuid.uuid4()) |
| headers[self.X_ACS_SIGNATURE_VERSION] = self.SIGNATURE_VERSION_VALUE |
| headers[self.X_ACS_VERSION] = self.API_VERSION |
| |
| # Security token (if present) |
| if security_token is not None: |
| headers[self.X_ACS_SECURITY_TOKEN] = security_token |
| |
| return headers |
| |
| def authorization( |
| self, |
| rest_auth_parameter: RESTAuthParameter, |
| token: DLFToken, |
| host: str, |
| sign_headers: Dict[str, str] |
| ) -> str: |
| try: |
| # Step 1: Build CanonicalizedHeaders (x-acs-* headers, sorted, lowercase) |
| canonicalized_headers = self._build_canonicalized_headers(sign_headers) |
| |
| # Step 2: Build CanonicalizedResource (path + sorted query string) |
| canonicalized_resource = self._build_canonicalized_resource(rest_auth_parameter) |
| |
| # Step 3: Build StringToSign |
| string_to_sign = self._build_string_to_sign( |
| rest_auth_parameter, sign_headers, canonicalized_headers, canonicalized_resource |
| ) |
| |
| # Step 4: Calculate signature |
| signature = self._calculate_signature(string_to_sign, token.access_key_secret) |
| |
| # Step 5: Build Authorization header |
| return f"acs {token.access_key_id}:{signature}" |
| |
| except Exception as e: |
| raise RuntimeError(f"Failed to generate authorization: {e}") |
| |
| def identifier(self) -> str: |
| return self.IDENTIFIER |
| |
| def _build_canonicalized_headers(self, headers: Dict[str, str]) -> str: |
| sorted_headers = {} |
| for key, value in headers.items(): |
| lower_key = key.lower() |
| if lower_key.startswith("x-acs-"): |
| sorted_headers[lower_key] = self._trim(value) |
| |
| sorted_headers = OrderedDict(sorted(sorted_headers.items())) |
| |
| sb = [] |
| for key, value in sorted_headers.items(): |
| sb.append(f"{key}:{value}\n") |
| return "".join(sb) |
| |
| def _build_canonicalized_resource(self, rest_auth_parameter: RESTAuthParameter) -> str: |
| # Decode the path and use the original unencoded path for signature calculation |
| path = unquote(rest_auth_parameter.path) |
| params = rest_auth_parameter.parameters |
| |
| if not params: |
| return path |
| |
| # Sort query parameters by key |
| sorted_params = OrderedDict(sorted(params.items())) |
| |
| # Build query string |
| query_parts = [] |
| for key, value in sorted_params.items(): |
| decoded_value = unquote(value) if value else "" |
| if decoded_value: |
| query_parts.append(f"{key}={decoded_value}") |
| else: |
| query_parts.append(key) |
| |
| return f"{path}?{'&'.join(query_parts)}" |
| |
| def _build_string_to_sign( |
| self, |
| rest_auth_parameter: RESTAuthParameter, |
| headers: Dict[str, str], |
| canonicalized_headers: str, |
| canonicalized_resource: str |
| ) -> str: |
| parts = [] |
| |
| # HTTPMethod |
| parts.append(rest_auth_parameter.method) |
| |
| # Accept |
| parts.append(headers.get(self.ACCEPT_HEADER, "")) |
| |
| # Content-MD5 |
| parts.append(headers.get(self.CONTENT_MD5_HEADER, "")) |
| |
| # Content-Type |
| parts.append(headers.get(self.CONTENT_TYPE_HEADER, "")) |
| |
| # Date |
| parts.append(headers.get(self.DATE_HEADER, "")) |
| |
| # CanonicalizedHeaders |
| parts.append(canonicalized_headers) |
| |
| # CanonicalizedResource (no newline after this) |
| string_to_sign = "\n".join(parts) + canonicalized_resource |
| |
| return string_to_sign |
| |
| def _calculate_signature(self, string_to_sign: str, access_key_secret: str) -> str: |
| try: |
| mac = hmac.new( |
| access_key_secret.encode("utf-8"), |
| string_to_sign.encode("utf-8"), |
| hashlib.sha1 |
| ) |
| signature_bytes = mac.digest() |
| return base64.b64encode(signature_bytes).decode("utf-8") |
| except Exception as e: |
| raise RuntimeError(f"Failed to calculate signature: {e}") |
| |
| @staticmethod |
| def _md5_base64(data: str) -> str: |
| md5_hash = hashlib.md5(data.encode("utf-8")).digest() |
| return base64.b64encode(md5_hash).decode("utf-8") |
| |
| @staticmethod |
| def _trim(value: str) -> str: |
| return value.strip() if value else "" |