blob: 2574ba74cd68bbe2eb3d389646f7eb69e4c522ea [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import base64
import hashlib
import hmac
import uuid
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime
from typing import Dict, Optional
from urllib.parse import unquote
from pypaimon.api.token_loader import DLFToken
from pypaimon.api.typedef import RESTAuthParameter
class DLFRequestSigner(ABC):
"""
Interface for DLF request signers. Different signers implement different
signature algorithms (e.g., DLF4-HMAC-SHA256, ROA v2 HMAC-SHA1).
"""
@abstractmethod
def sign_headers(
self,
body: Optional[str],
now: datetime,
security_token: Optional[str],
host: str
) -> Dict[str, str]:
"""
Generate signature headers for the request.
Args:
body: request body (can be None for GET requests)
now: current timestamp
security_token: security token (can be None)
host: request host
Returns:
map of signature-related headers
"""
pass
@abstractmethod
def authorization(
self,
rest_auth_parameter: RESTAuthParameter,
token: DLFToken,
host: str,
sign_headers: Dict[str, str]
) -> str:
"""
Generate the Authorization header value.
Args:
rest_auth_parameter: request parameters (method, path, query, body)
token: DLF token (access key id, secret, security token)
host: request host
sign_headers: headers generated by sign_headers()
Returns:
Authorization header value
"""
pass
@abstractmethod
def identifier(self) -> str:
"""
Get the identifier for this signer (e.g., "default", "openapi").
Returns:
signer identifier
"""
pass
class DLFDefaultSigner(DLFRequestSigner):
"""
Default DLF signer using DLF4-HMAC-SHA256 algorithm.
Used for VPC endpoints (e.g., cn-hangzhou-vpc.dlf.aliyuncs.com).
"""
IDENTIFIER = "default"
VERSION = "v1"
SIGNATURE_ALGORITHM = "DLF4-HMAC-SHA256"
PRODUCT = "DlfNext"
REQUEST_TYPE = "aliyun_v4_request"
SIGNATURE_KEY = "Signature"
NEW_LINE = "\n"
# Header keys
DLF_CONTENT_MD5_HEADER_KEY = "Content-MD5"
DLF_CONTENT_TYPE_KEY = "Content-Type"
DLF_DATE_HEADER_KEY = "x-dlf-date"
DLF_SECURITY_TOKEN_HEADER_KEY = "x-dlf-security-token"
DLF_AUTH_VERSION_HEADER_KEY = "x-dlf-version"
DLF_CONTENT_SHA56_HEADER_KEY = "x-dlf-content-sha256"
DLF_CONTENT_SHA56_VALUE = "UNSIGNED-PAYLOAD"
AUTH_DATE_TIME_FORMAT = "%Y%m%dT%H%M%SZ"
MEDIA_TYPE = "application/json"
SIGNED_HEADERS = [
DLF_CONTENT_MD5_HEADER_KEY.lower(),
DLF_CONTENT_TYPE_KEY.lower(),
DLF_CONTENT_SHA56_HEADER_KEY.lower(),
DLF_DATE_HEADER_KEY.lower(),
DLF_AUTH_VERSION_HEADER_KEY.lower(),
DLF_SECURITY_TOKEN_HEADER_KEY.lower(),
]
def __init__(self, region: str):
self.region = region
def sign_headers(
self,
body: Optional[str],
now: datetime,
security_token: Optional[str],
host: str
) -> Dict[str, str]:
date_time = now.strftime(self.AUTH_DATE_TIME_FORMAT)
sign_headers = {}
sign_headers[self.DLF_DATE_HEADER_KEY] = date_time
sign_headers[self.DLF_CONTENT_SHA56_HEADER_KEY] = self.DLF_CONTENT_SHA56_VALUE
sign_headers[self.DLF_AUTH_VERSION_HEADER_KEY] = self.VERSION
if body is not None and body != "":
sign_headers[self.DLF_CONTENT_TYPE_KEY] = self.MEDIA_TYPE
sign_headers[self.DLF_CONTENT_MD5_HEADER_KEY] = self._md5(body)
if security_token is not None:
sign_headers[self.DLF_SECURITY_TOKEN_HEADER_KEY] = security_token
return sign_headers
def authorization(
self,
rest_auth_parameter: RESTAuthParameter,
token: DLFToken,
host: str,
sign_headers: Dict[str, str]
) -> str:
try:
date_time = sign_headers.get(self.DLF_DATE_HEADER_KEY)
date = date_time[:8]
canonical_request = self._get_canonical_request(rest_auth_parameter, sign_headers)
string_to_sign = self.NEW_LINE.join([
self.SIGNATURE_ALGORITHM,
date_time,
f"{date}/{self.region}/{self.PRODUCT}/{self.REQUEST_TYPE}",
self._sha256_hex(canonical_request),
])
date_key = self._hmac_sha256(
f"aliyun_v4{token.access_key_secret}".encode("utf-8"), date
)
date_region_key = self._hmac_sha256(date_key, self.region)
date_region_service_key = self._hmac_sha256(date_region_key, self.PRODUCT)
signing_key = self._hmac_sha256(date_region_service_key, self.REQUEST_TYPE)
result = self._hmac_sha256(signing_key, string_to_sign)
signature = self._hex_encode(result)
credential = (f"{self.SIGNATURE_ALGORITHM} "
f"Credential={token.access_key_id}/{date}/{self.region}/{self.PRODUCT}/{self.REQUEST_TYPE}")
signature_part = f"{self.SIGNATURE_KEY}={signature}"
return f"{credential},{signature_part}"
except Exception as e:
raise RuntimeError(f"Failed to generate authorization: {e}")
def identifier(self) -> str:
return self.IDENTIFIER
def _get_canonical_request(
self, rest_auth_parameter: RESTAuthParameter, headers: Dict[str, str]
) -> str:
canonical_request = self.NEW_LINE.join(
[rest_auth_parameter.method, rest_auth_parameter.path]
)
canonical_query_string = self._build_canonical_query_string(
rest_auth_parameter.parameters
)
canonical_request = self.NEW_LINE.join(
[canonical_request, canonical_query_string]
)
sorted_signed_headers_map = self._build_sorted_signed_headers_map(headers)
for key, value in sorted_signed_headers_map.items():
canonical_request = self.NEW_LINE.join(
[canonical_request, f"{key}:{value}"])
content_sha256 = headers.get(
self.DLF_CONTENT_SHA56_HEADER_KEY,
self.DLF_CONTENT_SHA56_VALUE,
)
return self.NEW_LINE.join([canonical_request, content_sha256])
def _build_canonical_query_string(self, parameters: Optional[Dict[str, str]]) -> str:
if not parameters:
return ""
sorted_params = OrderedDict(sorted(parameters.items()))
query_parts = []
for key, value in sorted_params.items():
key = self._trim(key)
if value is not None and value != "":
value = self._trim(value)
query_parts.append(f"{key}={value}")
else:
query_parts.append(key)
return "&".join(query_parts)
def _build_sorted_signed_headers_map(
self, headers: Optional[Dict[str, str]]
) -> OrderedDict:
sorted_headers = OrderedDict()
if headers:
for key, value in headers.items():
lower_key = key.lower()
if lower_key in self.SIGNED_HEADERS:
sorted_headers[lower_key] = self._trim(value)
return OrderedDict(sorted(sorted_headers.items()))
@staticmethod
def _md5(raw: str) -> str:
try:
md5_hash = hashlib.md5(raw.encode("utf-8")).digest()
return base64.b64encode(md5_hash).decode("utf-8")
except Exception as e:
raise RuntimeError(f"Failed to calculate MD5: {e}")
@staticmethod
def _hmac_sha256(key: bytes, data: str) -> bytes:
try:
return hmac.new(key, data.encode("utf-8"), hashlib.sha256).digest()
except Exception as e:
raise RuntimeError(f"Failed to calculate HMAC-SHA256: {e}")
@staticmethod
def _sha256_hex(raw: str) -> str:
try:
sha256_hash = hashlib.sha256(raw.encode("utf-8")).digest()
return DLFDefaultSigner._hex_encode(sha256_hash)
except Exception as e:
raise RuntimeError(f"Failed to calculate SHA256: {e}")
@staticmethod
def _hex_encode(raw: bytes) -> str:
if raw is None:
return None
return raw.hex()
@staticmethod
def _trim(value: str) -> str:
return value.strip() if value else ""
class DLFOpenApiSigner(DLFRequestSigner):
"""
DLF OpenAPI signer using HMAC-SHA1 algorithm (Alibaba Cloud ROA v2 style).
Used for public network access through dlfnext endpoints
(e.g., dlfnext.cn-hangzhou.aliyuncs.com).
"""
IDENTIFIER = "openapi"
# Header constants
DATE_HEADER = "Date"
ACCEPT_HEADER = "Accept"
CONTENT_MD5_HEADER = "Content-MD5"
CONTENT_TYPE_HEADER = "Content-Type"
HOST_HEADER = "Host"
X_ACS_SIGNATURE_METHOD = "x-acs-signature-method"
X_ACS_SIGNATURE_NONCE = "x-acs-signature-nonce"
X_ACS_SIGNATURE_VERSION = "x-acs-signature-version"
X_ACS_VERSION = "x-acs-version"
X_ACS_SECURITY_TOKEN = "x-acs-security-token"
# Values
DATE_FORMAT = "%a, %d %b %Y %H:%M:%S GMT"
ACCEPT_VALUE = "application/json"
CONTENT_TYPE_VALUE = "application/json"
SIGNATURE_METHOD_VALUE = "HMAC-SHA1"
SIGNATURE_VERSION_VALUE = "1.0"
API_VERSION = "2026-01-18"
HMAC_SHA1 = "sha1"
def sign_headers(
self,
body: Optional[str],
now: datetime,
security_token: Optional[str],
host: str
) -> Dict[str, str]:
headers = {}
# Date header in RFC 1123 format
headers[self.DATE_HEADER] = now.strftime(self.DATE_FORMAT)
# Accept header
headers[self.ACCEPT_HEADER] = self.ACCEPT_VALUE
# Content-MD5 (if body exists)
if body is not None and body != "":
try:
headers[self.CONTENT_MD5_HEADER] = self._md5_base64(body)
headers[self.CONTENT_TYPE_HEADER] = self.CONTENT_TYPE_VALUE
except Exception as e:
raise RuntimeError(f"Failed to calculate Content-MD5: {e}")
# Host header
headers[self.HOST_HEADER] = host
# x-acs-* headers
headers[self.X_ACS_SIGNATURE_METHOD] = self.SIGNATURE_METHOD_VALUE
headers[self.X_ACS_SIGNATURE_NONCE] = str(uuid.uuid4())
headers[self.X_ACS_SIGNATURE_VERSION] = self.SIGNATURE_VERSION_VALUE
headers[self.X_ACS_VERSION] = self.API_VERSION
# Security token (if present)
if security_token is not None:
headers[self.X_ACS_SECURITY_TOKEN] = security_token
return headers
def authorization(
self,
rest_auth_parameter: RESTAuthParameter,
token: DLFToken,
host: str,
sign_headers: Dict[str, str]
) -> str:
try:
# Step 1: Build CanonicalizedHeaders (x-acs-* headers, sorted, lowercase)
canonicalized_headers = self._build_canonicalized_headers(sign_headers)
# Step 2: Build CanonicalizedResource (path + sorted query string)
canonicalized_resource = self._build_canonicalized_resource(rest_auth_parameter)
# Step 3: Build StringToSign
string_to_sign = self._build_string_to_sign(
rest_auth_parameter, sign_headers, canonicalized_headers, canonicalized_resource
)
# Step 4: Calculate signature
signature = self._calculate_signature(string_to_sign, token.access_key_secret)
# Step 5: Build Authorization header
return f"acs {token.access_key_id}:{signature}"
except Exception as e:
raise RuntimeError(f"Failed to generate authorization: {e}")
def identifier(self) -> str:
return self.IDENTIFIER
def _build_canonicalized_headers(self, headers: Dict[str, str]) -> str:
sorted_headers = {}
for key, value in headers.items():
lower_key = key.lower()
if lower_key.startswith("x-acs-"):
sorted_headers[lower_key] = self._trim(value)
sorted_headers = OrderedDict(sorted(sorted_headers.items()))
sb = []
for key, value in sorted_headers.items():
sb.append(f"{key}:{value}\n")
return "".join(sb)
def _build_canonicalized_resource(self, rest_auth_parameter: RESTAuthParameter) -> str:
# Decode the path and use the original unencoded path for signature calculation
path = unquote(rest_auth_parameter.path)
params = rest_auth_parameter.parameters
if not params:
return path
# Sort query parameters by key
sorted_params = OrderedDict(sorted(params.items()))
# Build query string
query_parts = []
for key, value in sorted_params.items():
decoded_value = unquote(value) if value else ""
if decoded_value:
query_parts.append(f"{key}={decoded_value}")
else:
query_parts.append(key)
return f"{path}?{'&'.join(query_parts)}"
def _build_string_to_sign(
self,
rest_auth_parameter: RESTAuthParameter,
headers: Dict[str, str],
canonicalized_headers: str,
canonicalized_resource: str
) -> str:
parts = []
# HTTPMethod
parts.append(rest_auth_parameter.method)
# Accept
parts.append(headers.get(self.ACCEPT_HEADER, ""))
# Content-MD5
parts.append(headers.get(self.CONTENT_MD5_HEADER, ""))
# Content-Type
parts.append(headers.get(self.CONTENT_TYPE_HEADER, ""))
# Date
parts.append(headers.get(self.DATE_HEADER, ""))
# CanonicalizedHeaders
parts.append(canonicalized_headers)
# CanonicalizedResource (no newline after this)
string_to_sign = "\n".join(parts) + canonicalized_resource
return string_to_sign
def _calculate_signature(self, string_to_sign: str, access_key_secret: str) -> str:
try:
mac = hmac.new(
access_key_secret.encode("utf-8"),
string_to_sign.encode("utf-8"),
hashlib.sha1
)
signature_bytes = mac.digest()
return base64.b64encode(signature_bytes).decode("utf-8")
except Exception as e:
raise RuntimeError(f"Failed to calculate signature: {e}")
@staticmethod
def _md5_base64(data: str) -> str:
md5_hash = hashlib.md5(data.encode("utf-8")).digest()
return base64.b64encode(md5_hash).decode("utf-8")
@staticmethod
def _trim(value: str) -> str:
return value.strip() if value else ""