blob: 6676d85c4141f49993b1b43be0df20b20e1ec7f8 [file] [log] [blame]
#!/usr/bin/env python3
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
Airflow Registry Metadata Extractor
Extracts provider metadata from:
1. provider.yaml files - Rich metadata (integrations, logos, categories)
2. pyproject.toml files - Dependencies, Python version constraints
3. PyPI API - Download statistics and release dates (optional, requires network)
Output: providers.json for the Astro static site generator
Module discovery (modules.json) is handled by extract_parameters.py, which uses
runtime inspection inside breeze for accurate class discovery.
"""
from __future__ import annotations
import datetime
import json
import re
import shutil
import urllib.request
import zlib
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any
import tomllib
import yaml
from registry_contract_models import validate_providers_catalog
# External endpoints used by metadata extraction.
PYPISTATS_RECENT_URL = "https://pypistats.org/api/packages/{package_name}/recent"
PYPI_PACKAGE_JSON_URL = "https://pypi.org/pypi/{package_name}/json"
S3_DOC_URL = "http://apache-airflow-docs.s3-website.eu-central-1.amazonaws.com"
AIRFLOW_PROVIDER_DOCS_URL = "https://airflow.apache.org/docs/{package_name}/stable/"
AIRFLOW_PROVIDER_SOURCE_URL = (
"https://github.com/apache/airflow/tree/providers-{provider_id}/{version}/providers/{provider_path}"
)
PYPI_PACKAGE_URL = "https://pypi.org/project/{package_name}/"
def fetch_pypi_downloads(package_name: str) -> dict[str, int]:
"""Fetch download statistics from pypistats.org API."""
try:
url = PYPISTATS_RECENT_URL.format(package_name=package_name)
with urllib.request.urlopen(url, timeout=5) as response:
data = json.loads(response.read().decode())
return {
"weekly": data["data"].get("last_week", 0),
"monthly": data["data"].get("last_month", 0),
"total": 0, # Total not available in recent endpoint
}
except Exception as e:
print(f" Warning: Could not fetch PyPI stats for {package_name}: {e}")
return {"weekly": 0, "monthly": 0, "total": 0}
def fetch_pypi_dates(package_name: str) -> dict[str, str]:
"""Fetch first release and latest release dates from PyPI JSON API."""
try:
url = PYPI_PACKAGE_JSON_URL.format(package_name=package_name)
with urllib.request.urlopen(url, timeout=10) as response:
data = json.loads(response.read().decode())
earliest = None
latest = None
for version_files in data.get("releases", {}).values():
for file_info in version_files:
upload = file_info.get("upload_time_iso_8601") or file_info.get("upload_time")
if not upload:
continue
ts = upload[:10]
if earliest is None or ts < earliest:
earliest = ts
if latest is None or ts > latest:
latest = ts
return {
"first_released": earliest or "",
"last_updated": latest or "",
}
except Exception as e:
print(f" Warning: Could not fetch PyPI dates for {package_name}: {e}")
return {"first_released": "", "last_updated": ""}
def _parse_inventory_lines(inv_path: Path) -> list[str]:
"""Read and decompress the body of a Sphinx objects.inv file."""
with inv_path.open("rb") as f:
for _ in range(4):
f.readline()
return zlib.decompress(f.read()).decode("utf-8").splitlines()
def read_inventory(inv_path: Path) -> dict[str, str]:
"""Parse a Sphinx objects.inv file and return {qualified_name: url_path} for py:class entries."""
result: dict[str, str] = {}
for line in _parse_inventory_lines(inv_path):
parts = line.split(None, 4)
if len(parts) != 5:
continue
name, domain_role, _prio, location, _dispname = parts
if domain_role == "py:class":
# "$" in location means "use the name as anchor"
result[name] = location.replace("$", name)
return result
def read_connection_urls(inv_path: Path) -> dict[str, str]:
"""Parse a Sphinx objects.inv and return {conn_type: relative_url} for connection pages.
Uses two inventory entry types:
- ``std:label howto/connection:{conn_type}`` — maps conn_type directly to a page
- ``std:doc connections/{name}`` — fallback by matching conn_type to doc name
"""
label_map: dict[str, str] = {} # conn_type -> page URL (from std:label)
doc_map: dict[str, str] = {} # doc_name -> page URL (from std:doc)
for line in _parse_inventory_lines(inv_path):
parts = line.split(None, 4)
if len(parts) != 5:
continue
name, domain_role, _prio, location, _dispname = parts
if domain_role == "std:label" and name.startswith("howto/connection:"):
label_key = name[len("howto/connection:") :]
# Skip sub-section labels like "gcp:configuring_the_connection"
if ":" not in label_key:
label_map[label_key] = location.split("#")[0]
elif domain_role == "std:doc" and name.startswith("connections/"):
doc_name = name[len("connections/") :]
if doc_name != "index":
doc_map[doc_name] = location
# Merge: label_map takes precedence, doc_map fills gaps
result: dict[str, str] = {}
result.update(label_map)
for doc_name, url in doc_map.items():
if doc_name not in result:
result[doc_name] = url
return result
INVENTORY_CACHE_DIR = Path(__file__).parent / ".inventory_cache"
INVENTORY_TTL = datetime.timedelta(hours=12)
def fetch_provider_inventory(package_name: str, cache_dir: Path = INVENTORY_CACHE_DIR) -> Path | None:
"""Download a provider's objects.inv from S3, caching locally with a 12-hour TTL.
Returns the local cache path on success, or None if the fetch fails
(e.g. provider not yet published).
"""
cache_path = cache_dir / package_name / "objects.inv"
if cache_path.exists():
age = datetime.datetime.now(tz=datetime.timezone.utc) - datetime.datetime.fromtimestamp(
cache_path.stat().st_mtime, tz=datetime.timezone.utc
)
if age < INVENTORY_TTL:
return cache_path
url = f"{S3_DOC_URL}/docs/{package_name}/stable/objects.inv"
try:
with urllib.request.urlopen(url, timeout=10) as response:
content = response.read()
# Validate it's a Sphinx inventory
if not content.startswith(b"# Sphinx inventory version"):
print(f" Warning: Invalid inventory header for {package_name}")
return None
cache_path.parent.mkdir(parents=True, exist_ok=True)
cache_path.write_bytes(content)
return cache_path
except Exception as e:
print(f" Warning: Could not fetch inventory for {package_name}: {e}")
# On refetch failure, serve stale cache rather than nothing
if cache_path.exists():
print(f" Using stale cache for {package_name}")
return cache_path
return None
def resolve_connection_docs_url(conn_type: str, conn_url_map: dict[str, str], base_docs_url: str) -> str:
"""Resolve the docs URL for a connection type using the inventory map.
Lookup order:
1. Exact match on conn_type in the inventory map
2. Fallback to connections/ directory listing
"""
if conn_type in conn_url_map:
return f"{base_docs_url}/{conn_url_map[conn_type]}"
return f"{base_docs_url}/connections/"
# Base paths
AIRFLOW_ROOT = Path(__file__).parent.parent.parent
SCRIPT_DIR = Path(__file__).parent
PROVIDERS_DIR = AIRFLOW_ROOT / "providers"
REGISTRY_DIR = AIRFLOW_ROOT / "registry"
OUTPUT_DIR = REGISTRY_DIR / "src" / "_data"
@dataclass
class Category:
"""Category within a provider."""
id: str
name: str
module_count: int = 0
@dataclass
class Provider:
"""Provider metadata."""
id: str
name: str
package_name: str
description: str
lifecycle: str = "production" # AIP-95: incubation, production, mature, deprecated
logo: str | None = None
version: str = ""
versions: list[str] = field(default_factory=list)
airflow_versions: list[str] = field(default_factory=list)
pypi_downloads: dict[str, int] = field(default_factory=lambda: {"weekly": 0, "monthly": 0, "total": 0})
module_counts: dict[str, int] = field(
default_factory=lambda: {
"operator": 0,
"hook": 0,
"sensor": 0,
"trigger": 0,
"transfer": 0,
"notifier": 0,
"secret": 0,
"logging": 0,
"executor": 0,
"bundle": 0,
"decorator": 0,
}
)
categories: list[dict] = field(default_factory=list)
connection_types: list[dict] = field(default_factory=list) # {conn_type, hook_class, docs_url}
requires_python: str = "" # e.g., ">=3.10"
dependencies: list[str] = field(default_factory=list) # from pyproject.toml
optional_extras: dict[str, list[str]] = field(default_factory=dict) # {extra_name: [deps]}
dependents: list[str] = field(default_factory=list)
related_providers: list[str] = field(default_factory=list)
docs_url: str = ""
source_url: str = ""
pypi_url: str = ""
first_released: str = ""
last_updated: str = ""
def parse_provider_yaml(yaml_path: Path) -> dict[str, Any]:
"""Parse a provider.yaml file."""
with open(yaml_path) as f:
return yaml.safe_load(f)
def parse_pyproject_toml(pyproject_path: Path) -> dict[str, Any]:
"""Parse pyproject.toml and extract requires-python, dependencies, and optional extras."""
result: dict[str, Any] = {"requires_python": "", "dependencies": [], "optional_extras": {}}
if not pyproject_path.exists():
return result
try:
with open(pyproject_path, "rb") as f:
data = tomllib.load(f)
project = data.get("project", {})
result["requires_python"] = project.get("requires-python", "")
result["dependencies"] = [d.strip() for d in project.get("dependencies", [])][:20]
optional_deps = project.get("optional-dependencies", {})
for extra_name, extra_deps in optional_deps.items():
clean = [d.strip() for d in extra_deps if d.strip()]
if clean:
result["optional_extras"][extra_name] = clean[:5]
except Exception as e:
print(f" Warning: Could not parse {pyproject_path}: {e}")
return result
def extract_integrations_as_categories(provider_yaml: dict[str, Any]) -> list[Category]:
"""Extract integrations from provider.yaml as categories."""
categories: dict[str, Category] = {}
for integration in provider_yaml.get("integrations", []):
name = integration.get("integration-name", "")
if not name:
continue
# Create a slug for the category ID
cat_id = name.lower().replace(" ", "-").replace("(", "").replace(")", "")
cat_id = re.sub(r"[^a-z0-9-]", "", cat_id)
if cat_id not in categories:
categories[cat_id] = Category(id=cat_id, name=name, module_count=0)
return list(categories.values())
def module_path_to_file_path(module_path: str, provider_path: Path) -> Path:
"""Convert a Python module path to an actual file path.
provider_path is the actual filesystem directory (e.g., providers/microsoft/azure/),
not the dash-joined provider_id.
"""
# e.g., airflow.providers.amazon.operators.s3 -> providers/amazon/src/airflow/providers/amazon/operators/s3.py
parts = module_path.split(".")
file_path = provider_path / "src" / "/".join(parts)
return file_path.with_suffix(".py")
def determine_airflow_versions(dependencies: list[str]) -> list[str]:
"""Determine minimum Airflow version from pyproject.toml dependencies."""
for dep in dependencies:
if dep.startswith("apache-airflow>="):
version_str = dep.split(">=")[1].split(",")[0].strip()
parts = version_str.split(".")
if len(parts) >= 2:
return [f"{parts[0]}.{parts[1]}+"]
return ["3.0+"]
def find_related_providers(provider_id: str, all_provider_yamls: dict[str, dict]) -> list[str]:
"""Find related providers based on shared integrations or dependencies."""
current = all_provider_yamls.get(provider_id, {})
current_integrations = {i.get("integration-name") for i in current.get("integrations", [])}
related = []
for other_id, other_yaml in all_provider_yamls.items():
if other_id == provider_id:
continue
other_integrations = {i.get("integration-name") for i in other_yaml.get("integrations", [])}
overlap = current_integrations & other_integrations
if overlap:
related.append(other_id)
return related[:5] # Limit to 5 related providers
def main():
"""Main extraction function."""
import argparse
parser = argparse.ArgumentParser(description="Airflow Registry Metadata Extractor")
parser.add_argument(
"--provider",
default=None,
help="Extract only this provider ID (e.g. 'amazon'). Omit for full build.",
)
args = parser.parse_args()
print("Airflow Registry Metadata Extractor")
print("=" * 50)
if args.provider:
requested_providers = set(args.provider.split())
print(f"Incremental mode: extracting provider(s) {requested_providers}")
else:
requested_providers = None
# Ensure output directory exists
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
all_providers: list[Provider] = []
all_provider_yamls: dict[str, dict] = {}
# First pass: Load all provider.yaml files (including nested ones like dbt/cloud, microsoft/azure)
provider_yaml_paths: dict[str, Path] = {}
for yaml_path in PROVIDERS_DIR.rglob("provider.yaml"):
# Calculate provider_id from path relative to PROVIDERS_DIR
# e.g., providers/amazon/provider.yaml -> amazon
# e.g., providers/microsoft/azure/provider.yaml -> microsoft-azure
relative_path = yaml_path.relative_to(PROVIDERS_DIR)
parts = relative_path.parts[:-1] # Remove 'provider.yaml'
provider_id = "-".join(parts)
try:
provider_yaml = parse_provider_yaml(yaml_path)
all_provider_yamls[provider_id] = provider_yaml
provider_yaml_paths[provider_id] = yaml_path.parent
except Exception as e:
print(f" Error parsing {yaml_path}: {e}")
print(f"Found {len(all_provider_yamls)} providers with provider.yaml")
# Filter to requested providers if --provider was given
if requested_providers:
filtered_ids = {pid for pid in all_provider_yamls if pid in requested_providers}
missing = requested_providers - filtered_ids
if missing:
print(f" Warning: provider(s) not found: {missing}")
# Keep only the requested providers for extraction
extraction_ids = filtered_ids
else:
extraction_ids = set(all_provider_yamls.keys())
# Second pass: Extract full metadata (only for providers in extraction_ids)
for provider_id in extraction_ids:
provider_yaml = all_provider_yamls[provider_id]
provider_path = provider_yaml_paths[provider_id]
package_name = provider_yaml.get("package-name", f"apache-airflow-providers-{provider_id}")
name = provider_yaml.get("name", provider_id.replace("-", " ").title())
description = provider_yaml.get("description", "")
# Clean up RST formatting in description
# Convert RST links like `Text <url>`__ to just "Text"
description = re.sub(r"`([^<]+)\s*<[^>]+>`__", r"\1", description)
# Remove any remaining backticks
description = re.sub(r"`", "", description)
# Remove bullet points (- at start of lines)
description = re.sub(r"\n\s*-\s*", ", ", description)
# Clean up extra whitespace and normalize
description = re.sub(r"\s+", " ", description).strip()
# Remove trailing commas
description = re.sub(r",\s*$", "", description)
# Fix extra spaces before punctuation
description = re.sub(r"\s+([),.])", r"\1", description)
# Remove "including:" followed by just commas
description = re.sub(r"including:\s*,", "including", description)
# Fix "(including X )" -> "(including X)"
description = re.sub(r"\s+\)", ")", description)
# Remove empty parentheses
description = re.sub(r"\(\s*\)", "", description)
# Truncate long descriptions
if len(description) > 200:
description = description[:197] + "..."
# Get versions
versions = provider_yaml.get("versions", [])
version = versions[0] if versions else "0.0.0"
# Extract categories from integrations
categories = extract_integrations_as_categories(provider_yaml)
# Find logo from provider's docs/integration-logos/ directory and copy
# to dev/registry/logos/ (mounted in breeze). The CI workflow copies
# these into registry/public/logos/ before the 11ty build.
logo = None
logo_path = None
# Try to find the most representative logo for the provider
# Priority: Look for "main" provider logos that match the provider name
integration_logos_dir = provider_path / "docs" / "integration-logos"
# Common patterns for main provider logos
main_logo_patterns = [
f"{name.replace(' ', '-')}.png", # e.g., "Google-Cloud.png"
f"{name.replace(' ', '-')}.svg",
f"{name}.png",
f"{provider_id.replace('-', '_')}.png",
]
# Special case mappings for well-known providers
logo_priority_map = {
"amazon": ["AWS-Cloud-alt_light-bg@4x.png", "Amazon-Web-Services.png"],
"google": ["Google-Cloud.png", "Google.png"],
"microsoft-azure": ["Microsoft-Azure.png"],
"snowflake": ["Snowflake.png"],
"databricks": ["Databricks.png"],
}
# Write logos to dev/registry/logos/ — this directory is mounted in
# breeze (unlike registry/public/) so copies survive the container.
# Also copy to registry/public/logos/ for local dev convenience.
logos_dest_dir = SCRIPT_DIR / "logos"
logos_dest_dir.mkdir(parents=True, exist_ok=True)
registry_logos_dir = SCRIPT_DIR.parent.parent / "registry" / "public" / "logos"
registry_logos_dir.mkdir(parents=True, exist_ok=True)
if integration_logos_dir.exists():
# First, check for priority logos for known providers
if provider_id in logo_priority_map:
for priority_logo in logo_priority_map[provider_id]:
potential_logo = integration_logos_dir / priority_logo
if potential_logo.exists():
logo_dest = logos_dest_dir / f"{provider_id}-{potential_logo.name}"
shutil.copy2(potential_logo, logo_dest)
logo = f"/logos/{provider_id}-{potential_logo.name}"
break
# If no priority logo found, try general patterns
if not logo:
for pattern in main_logo_patterns:
potential_logo = integration_logos_dir / pattern
if potential_logo.exists():
logo_dest = logos_dest_dir / f"{provider_id}-{potential_logo.name}"
shutil.copy2(potential_logo, logo_dest)
logo = f"/logos/{provider_id}-{potential_logo.name}"
break
# Still no logo? Use the one from provider.yaml integrations
if not logo:
for integration in provider_yaml.get("integrations", []):
if integration.get("logo"):
logo_path = integration["logo"]
logo_filename = logo_path.split("/")[-1]
logo_source = integration_logos_dir / logo_filename
if logo_source.exists():
logo_dest = logos_dest_dir / f"{provider_id}-{logo_filename}"
shutil.copy2(logo_source, logo_dest)
logo = f"/logos/{provider_id}-{logo_filename}"
break
# Last resort: use the first available logo
if not logo:
logos = list(integration_logos_dir.glob("*.png")) + list(integration_logos_dir.glob("*.svg"))
if logos:
logo_source = logos[0]
logo_dest = logos_dest_dir / f"{provider_id}-{logo_source.name}"
shutil.copy2(logo_source, logo_dest)
logo = f"/logos/{provider_id}-{logo_source.name}"
# Also copy to registry/public/logos/ so local `pnpm dev` works without
# the extra CI copy step.
if logo:
logo_filename = logo.split("/")[-1]
src = logos_dest_dir / logo_filename
if src.exists():
shutil.copy2(src, registry_logos_dir / logo_filename)
# Extract connection types from provider.yaml
# Resolve per-connection docs URLs from Sphinx inventory when available
connection_types = []
base_docs_url = AIRFLOW_PROVIDER_DOCS_URL.format(package_name=package_name).rstrip("/")
conn_url_map: dict[str, str] = {}
inv_path = fetch_provider_inventory(package_name)
if inv_path:
conn_url_map = read_connection_urls(inv_path)
for conn in provider_yaml.get("connection-types", []):
conn_type = conn.get("connection-type", "")
hook_class = conn.get("hook-class-name", "")
if conn_type:
connection_types.append(
{
"conn_type": conn_type,
"hook_class": hook_class,
"docs_url": resolve_connection_docs_url(conn_type, conn_url_map, base_docs_url),
}
)
# Fetch PyPI download statistics and release dates
pypi_downloads = fetch_pypi_downloads(package_name)
pypi_dates = fetch_pypi_dates(package_name)
# Parse pyproject.toml for requires-python and dependencies
pyproject_path = provider_path / "pyproject.toml"
pyproject_data = parse_pyproject_toml(pyproject_path)
# Airflow version compatibility (from pyproject.toml dependencies)
airflow_versions = determine_airflow_versions(pyproject_data["dependencies"])
provider_source_path = provider_path.relative_to(PROVIDERS_DIR).as_posix()
provider = Provider(
id=provider_id,
name=name,
package_name=package_name,
description=description,
lifecycle=provider_yaml.get("lifecycle", "production"),
logo=logo,
version=version,
versions=versions,
airflow_versions=airflow_versions,
pypi_downloads=pypi_downloads,
categories=[asdict(c) for c in categories],
connection_types=connection_types,
requires_python=pyproject_data["requires_python"],
dependencies=pyproject_data["dependencies"],
optional_extras=pyproject_data.get("optional_extras", {}),
docs_url=AIRFLOW_PROVIDER_DOCS_URL.format(package_name=package_name),
source_url=AIRFLOW_PROVIDER_SOURCE_URL.format(
provider_id=provider_id, version=version, provider_path=provider_source_path
),
pypi_url=PYPI_PACKAGE_URL.format(package_name=package_name),
first_released=pypi_dates["first_released"],
last_updated=pypi_dates["last_updated"],
)
all_providers.append(provider)
print(f" {provider_id}: {len(categories)} categories")
# Find related providers
for provider in all_providers:
provider.related_providers = find_related_providers(provider.id, all_provider_yamls)
# Convert to JSON-serializable format
new_providers = [asdict(p) for p in all_providers]
# In incremental mode, merge new providers into existing providers.json
# so parallel runs for different providers don't clobber each other.
if requested_providers:
new_by_id = {p["id"]: p for p in new_providers}
for out_dir in [SCRIPT_DIR, OUTPUT_DIR]:
existing_path = out_dir / "providers.json"
if existing_path.exists():
try:
existing = json.loads(existing_path.read_text())
merged = [new_by_id.pop(p["id"], p) for p in existing["providers"]]
merged.extend(new_by_id.values())
new_providers = merged
print(
f"Merged {len(all_providers)} updated + {len(merged) - len(all_providers)} existing providers"
)
except (json.JSONDecodeError, KeyError):
pass
break
new_providers.sort(key=lambda p: p["name"].lower())
providers_json = validate_providers_catalog({"providers": new_providers})
# Write output files to all output directories.
# Inside breeze, registry/ is not mounted so OUTPUT_DIR writes are lost.
# SCRIPT_DIR (dev/registry/) is always mounted, so the other extraction
# scripts can pick up providers.json from there.
output_dirs = [OUTPUT_DIR, SCRIPT_DIR]
for out_dir in output_dirs:
if not out_dir.parent.exists():
continue
out_dir.mkdir(parents=True, exist_ok=True)
with open(out_dir / "providers.json", "w") as f:
json.dump(providers_json, f, indent=2)
print(f"\nWrote {len(new_providers)} providers to {out_dir}")
print("\nDone!")
if __name__ == "__main__":
main()