| #!/usr/bin/env python3 |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| Apache Artifacts Verification Script |
| |
| Comprehensive verification tool for Apache release artifacts. |
| Checks signatures, checksums, licenses, and archive integrity. |
| |
| Usage: |
| # List contents of an artifact |
| python scripts/verify_apache_artifacts.py list-contents dist/apache-burr-0.41.0.tar.gz |
| |
| # Verify signatures and checksums |
| python scripts/verify_apache_artifacts.py signatures |
| |
| # Verify licenses with Apache RAT |
| python scripts/verify_apache_artifacts.py licenses --rat-jar path/to/apache-rat.jar |
| |
| # Verify everything |
| python scripts/verify_apache_artifacts.py all --rat-jar path/to/apache-rat.jar |
| |
| # Specify custom artifacts directory |
| python scripts/verify_apache_artifacts.py signatures --artifacts-dir /path/to/dist |
| """ |
| |
| import argparse |
| import hashlib |
| import os |
| import shutil |
| import subprocess |
| import sys |
| import tarfile |
| import tempfile |
| import xml.etree.ElementTree as ET |
| import zipfile |
| |
| # Configuration |
| PROJECT_SHORT_NAME = "burr" |
| |
| |
| def _fail(message: str) -> None: |
| """Print error message and exit.""" |
| print(f"\n❌ {message}") |
| sys.exit(1) |
| |
| |
| def _print_section(title: str) -> None: |
| """Print formatted section header.""" |
| print("\n" + "=" * 80) |
| print(f" {title}") |
| print("=" * 80 + "\n") |
| |
| |
| # ============================================================================ |
| # Signature and Checksum Verification |
| # ============================================================================ |
| |
| |
| def _verify_artifact_signature(artifact_path: str, signature_path: str) -> bool: |
| """Verify GPG signature of artifact.""" |
| print(f" Verifying GPG signature: {os.path.basename(signature_path)}") |
| |
| if not os.path.exists(signature_path): |
| print(" ✗ Signature file not found") |
| return False |
| |
| try: |
| result = subprocess.run( |
| ["gpg", "--verify", signature_path, artifact_path], |
| capture_output=True, |
| check=False, |
| ) |
| if result.returncode == 0: |
| print(" ✓ GPG signature is valid") |
| return True |
| else: |
| print(" ✗ GPG signature verification failed") |
| if result.stderr: |
| print(f" Error: {result.stderr.decode()}") |
| return False |
| except subprocess.CalledProcessError: |
| print(" ✗ Error running GPG") |
| return False |
| |
| |
| def _verify_artifact_checksum(artifact_path: str, checksum_path: str) -> bool: |
| """Verify SHA512 checksum of artifact.""" |
| print(f" Verifying SHA512 checksum: {os.path.basename(checksum_path)}") |
| |
| if not os.path.exists(checksum_path): |
| print(" ✗ Checksum file not found") |
| return False |
| |
| # Read expected checksum |
| with open(checksum_path, "r", encoding="utf-8") as f: |
| expected_checksum = f.read().strip().split()[0] |
| |
| # Calculate actual checksum |
| sha512_hash = hashlib.sha512() |
| with open(artifact_path, "rb") as f: |
| while chunk := f.read(65536): |
| sha512_hash.update(chunk) |
| |
| actual_checksum = sha512_hash.hexdigest() |
| |
| if actual_checksum == expected_checksum: |
| print(" ✓ SHA512 checksum is valid") |
| return True |
| else: |
| print(" ✗ SHA512 checksum mismatch!") |
| print(f" Expected: {expected_checksum}") |
| print(f" Actual: {actual_checksum}") |
| return False |
| |
| |
| def _verify_tar_gz_readable(artifact_path: str) -> bool: |
| """Verify tar.gz archive can be read and contains files.""" |
| print(f" Checking archive readability: {os.path.basename(artifact_path)}") |
| |
| try: |
| with tarfile.open(artifact_path, "r:gz") as tar: |
| members = tar.getmembers() |
| |
| if len(members) == 0: |
| print(" ✗ Archive is empty (no files)") |
| return False |
| |
| print(f" ✓ Archive is readable and contains {len(members)} files") |
| return True |
| except tarfile.TarError as e: |
| print(f" ✗ Archive is corrupted or unreadable: {e}") |
| return False |
| except Exception as e: |
| print(f" ✗ Error reading archive: {e}") |
| return False |
| |
| |
| def _verify_wheel_readable(wheel_path: str) -> bool: |
| """Verify wheel can be read and contains expected structure.""" |
| print(f" Checking wheel readability: {os.path.basename(wheel_path)}") |
| |
| try: |
| with zipfile.ZipFile(wheel_path, "r") as whl: |
| file_list = whl.namelist() |
| |
| if len(file_list) == 0: |
| print(" ✗ Wheel is empty (no files)") |
| return False |
| |
| # Check for metadata |
| metadata_files = [f for f in file_list if "METADATA" in f or "WHEEL" in f] |
| if not metadata_files: |
| print(" ✗ Wheel missing required metadata files") |
| return False |
| |
| print(f" ✓ Wheel is readable and contains {len(file_list)} files") |
| return True |
| except zipfile.BadZipFile: |
| print(" ✗ Wheel is corrupted or not a valid ZIP file") |
| return False |
| except Exception as e: |
| print(f" ✗ Error reading wheel: {e}") |
| return False |
| |
| |
| def _verify_artifact_exists(artifact_path: str, min_size: int = 1000) -> bool: |
| """Verify artifact exists and has reasonable size.""" |
| if not os.path.exists(artifact_path): |
| print(f" ✗ Artifact not found: {os.path.basename(artifact_path)}") |
| return False |
| |
| file_size = os.path.getsize(artifact_path) |
| if file_size < min_size: |
| print( |
| f" ✗ Artifact is suspiciously small ({file_size} bytes): {os.path.basename(artifact_path)}" |
| ) |
| return False |
| |
| print(f" ✓ Artifact exists: {os.path.basename(artifact_path)} ({file_size:,} bytes)") |
| return True |
| |
| |
| def verify_signatures(artifacts_dir: str) -> bool: |
| """Verify all signatures and checksums in artifacts directory.""" |
| _print_section("Verifying Signatures and Checksums") |
| |
| if not os.path.exists(artifacts_dir): |
| _fail(f"Artifacts directory not found: {artifacts_dir}") |
| |
| # Find all artifacts (exclude .asc and .sha512 files) |
| all_files = [ |
| f for f in os.listdir(artifacts_dir) if os.path.isfile(os.path.join(artifacts_dir, f)) |
| ] |
| artifacts = [f for f in all_files if not f.endswith((".asc", ".sha512"))] |
| |
| if not artifacts: |
| print(f"⚠️ No artifacts found in {artifacts_dir}") |
| return False |
| |
| print(f"Found {len(artifacts)} artifact(s) to verify:\n") |
| |
| all_valid = True |
| for artifact_name in artifacts: |
| artifact_path = os.path.join(artifacts_dir, artifact_name) |
| |
| print(f"Verifying: {artifact_name}") |
| print("-" * 80) |
| |
| # Check existence and size |
| if not _verify_artifact_exists(artifact_path): |
| all_valid = False |
| continue |
| |
| # Verify signature |
| signature_path = f"{artifact_path}.asc" |
| if not _verify_artifact_signature(artifact_path, signature_path): |
| all_valid = False |
| |
| # Verify checksum |
| checksum_path = f"{artifact_path}.sha512" |
| if not _verify_artifact_checksum(artifact_path, checksum_path): |
| all_valid = False |
| |
| # Verify archive/wheel structure |
| if artifact_name.endswith(".tar.gz"): |
| if not _verify_tar_gz_readable(artifact_path): |
| all_valid = False |
| elif artifact_name.endswith(".whl"): |
| if not _verify_wheel_readable(artifact_path): |
| all_valid = False |
| |
| print() |
| |
| return all_valid |
| |
| |
| # ============================================================================ |
| # License Verification (Apache RAT) |
| # ============================================================================ |
| |
| |
| def _check_licenses_with_rat( |
| artifact_path: str, |
| rat_jar_path: str, |
| report_name: str, |
| report_only: bool = False, |
| ) -> bool: |
| """Run Apache RAT license checker on artifact.""" |
| print(f"\nRunning Apache RAT on: {os.path.basename(artifact_path)}") |
| print("-" * 80) |
| |
| # Create reports directory |
| report_dir = "dist" |
| os.makedirs(report_dir, exist_ok=True) |
| |
| rat_report_xml = os.path.join(report_dir, f"rat-report-{report_name}.xml") |
| rat_report_txt = os.path.join(report_dir, f"rat-report-{report_name}.txt") |
| |
| # Extract archive to temp directory |
| with tempfile.TemporaryDirectory() as temp_dir: |
| extract_dir = os.path.join(temp_dir, "extracted") |
| os.makedirs(extract_dir) |
| |
| print(" Extracting archive...") |
| try: |
| with tarfile.open(artifact_path, "r:gz") as tar: |
| # Use data filter for Python 3.12+ to avoid deprecation warning |
| tar.extractall(extract_dir, filter="data") |
| print(" ✓ Extracted to temp directory") |
| except Exception as e: |
| print(f" ✗ Error extracting archive: {e}") |
| return False |
| |
| # Locate .rat-excludes file |
| rat_excludes = ".rat-excludes" |
| if not os.path.exists(rat_excludes): |
| print(f" ⚠️ Warning: {rat_excludes} not found, running without excludes") |
| rat_excludes = None |
| |
| # Run RAT with XML output |
| print(" Running Apache RAT (XML format for parsing)...") |
| rat_cmd_xml = [ |
| "java", |
| "-jar", |
| rat_jar_path, |
| "-x", # XML output |
| "-d", |
| extract_dir, |
| ] |
| if rat_excludes: |
| rat_cmd_xml.extend(["-E", rat_excludes]) |
| |
| try: |
| with open(rat_report_xml, "w", encoding="utf-8") as report_file: |
| result = subprocess.run( |
| rat_cmd_xml, |
| stdout=report_file, |
| stderr=subprocess.PIPE, |
| text=True, |
| check=False, |
| ) |
| |
| if result.returncode != 0: |
| print(f" ⚠️ RAT exited with code {result.returncode}") |
| |
| print(f" ✓ RAT XML report: {rat_report_xml}") |
| except Exception as e: |
| print(f" ✗ Error running RAT (XML): {e}") |
| return False |
| |
| # Run RAT with plain text output |
| print(" Running Apache RAT (text format for review)...") |
| rat_cmd_txt = [ |
| "java", |
| "-jar", |
| rat_jar_path, |
| "-d", |
| extract_dir, |
| ] |
| if rat_excludes: |
| rat_cmd_txt.extend(["-E", rat_excludes]) |
| |
| try: |
| with open(rat_report_txt, "w", encoding="utf-8") as report_file: |
| subprocess.run( |
| rat_cmd_txt, |
| stdout=report_file, |
| stderr=subprocess.PIPE, |
| text=True, |
| check=False, |
| ) |
| print(f" ✓ RAT text report: {rat_report_txt}") |
| except Exception as e: |
| print(f" ⚠️ Warning: Could not generate text report: {e}") |
| |
| # Parse XML report |
| print(" Parsing RAT report...") |
| try: |
| tree = ET.parse(rat_report_xml) |
| root = tree.getroot() |
| |
| # Find license issues |
| unapproved_licenses = [] |
| unknown_licenses = [] |
| |
| for resource in root.findall(".//resource"): |
| name = resource.get("name", "unknown") |
| |
| # Get license approval and family from child elements |
| license_approval_elem = resource.find("license-approval") |
| license_family_elem = resource.find("license-family") |
| |
| license_approval = ( |
| license_approval_elem.get("name", "true") |
| if license_approval_elem is not None |
| else "true" |
| ) |
| license_family = ( |
| license_family_elem.get("name", "") if license_family_elem is not None else "" |
| ) |
| |
| if license_approval == "false" or license_family == "Unknown license": |
| if license_family == "Unknown license" or not license_family: |
| unknown_licenses.append(name) |
| else: |
| unapproved_licenses.append(name) |
| |
| # Report findings |
| total_files = len(root.findall(".//resource")) |
| issues_count = len(unapproved_licenses) + len(unknown_licenses) |
| |
| print(f" ✓ Scanned {total_files} files") |
| print(f" ✓ Found {issues_count} files with license issues") |
| |
| if issues_count > 0: |
| print("\n ⚠️ License Issues Found:") |
| |
| if unknown_licenses: |
| print(f"\n Unknown/Missing Licenses ({len(unknown_licenses)} files):") |
| for file in unknown_licenses[:10]: |
| print(f" - {file}") |
| if len(unknown_licenses) > 10: |
| print(f" ... and {len(unknown_licenses) - 10} more") |
| |
| if unapproved_licenses: |
| print(f"\n Unapproved Licenses ({len(unapproved_licenses)} files):") |
| for file in unapproved_licenses[:10]: |
| print(f" - {file}") |
| if len(unapproved_licenses) > 10: |
| print(f" ... and {len(unapproved_licenses) - 10} more") |
| |
| print("\n 📄 Reports saved:") |
| print(f" - {rat_report_xml} (structured)") |
| print(f" - {rat_report_txt} (human-readable)") |
| |
| if report_only: |
| print("\n ℹ️ Report-only mode: continuing despite license issues") |
| return True |
| else: |
| print("\n ❌ License check failed!") |
| return False |
| else: |
| print(" ✅ All files have approved licenses") |
| print("\n 📄 Reports saved:") |
| print(f" - {rat_report_xml} (structured)") |
| print(f" - {rat_report_txt} (human-readable)") |
| return True |
| |
| except Exception as e: |
| print(f" ✗ Error parsing RAT report: {e}") |
| if report_only: |
| print(" ℹ️ Report-only mode: continuing despite parse error") |
| return True |
| return False |
| |
| |
| def verify_licenses(artifacts_dir: str, rat_jar_path: str, report_only: bool = False) -> bool: |
| """Verify licenses in all tar.gz artifacts using Apache RAT.""" |
| _print_section("Verifying Licenses with Apache RAT") |
| |
| if not os.path.exists(artifacts_dir): |
| _fail(f"Artifacts directory not found: {artifacts_dir}") |
| |
| if not rat_jar_path or not os.path.exists(rat_jar_path): |
| _fail( |
| f"Apache RAT JAR not found: {rat_jar_path}\nDownload from: https://creadur.apache.org/rat/download_rat.cgi" |
| ) |
| |
| # Check for java |
| if shutil.which("java") is None: |
| _fail("Java not found. Required for Apache RAT.") |
| |
| # Find all tar.gz artifacts (not wheels) |
| all_files = [ |
| f for f in os.listdir(artifacts_dir) if os.path.isfile(os.path.join(artifacts_dir, f)) |
| ] |
| tar_artifacts = [f for f in all_files if f.endswith(".tar.gz")] |
| |
| if not tar_artifacts: |
| print(f"⚠️ No tar.gz artifacts found in {artifacts_dir}") |
| return False |
| |
| print(f"Found {len(tar_artifacts)} tar.gz artifact(s) to check:\n") |
| |
| all_valid = True |
| for artifact_name in tar_artifacts: |
| artifact_path = os.path.join(artifacts_dir, artifact_name) |
| |
| # Generate report name from artifact name |
| report_name = artifact_name.replace(".tar.gz", "").replace(".", "-") |
| |
| if not _check_licenses_with_rat(artifact_path, rat_jar_path, report_name, report_only): |
| all_valid = False |
| |
| return all_valid |
| |
| |
| # ============================================================================ |
| # List Contents |
| # ============================================================================ |
| |
| |
| def _list_tar_gz_contents(artifact_path: str) -> None: |
| """List contents of a tar.gz archive.""" |
| print(f"\nContents of: {os.path.basename(artifact_path)}") |
| print("=" * 80) |
| |
| try: |
| with tarfile.open(artifact_path, "r:gz") as tar: |
| members = tar.getmembers() |
| |
| print(f"Total files: {len(members)}\n") |
| |
| # Group by type |
| files = [m for m in members if m.isfile()] |
| dirs = [m for m in members if m.isdir()] |
| symlinks = [m for m in members if m.issym() or m.islnk()] |
| |
| print(f"Files: {len(files)}, Directories: {len(dirs)}, Symlinks: {len(symlinks)}\n") |
| |
| # Show all files |
| print("Files:\n") |
| |
| for member in members: |
| size = f"{member.size:>12,}" if member.isfile() else " <dir>" |
| prefix = " " |
| if member.issym() or member.islnk(): |
| prefix = "→ " |
| if member.linkname: |
| print(f"{prefix}{member.name} -> {member.linkname}") |
| continue |
| print(f"{prefix}{member.name:<70} {size}") |
| |
| except Exception as e: |
| print(f"Error reading archive: {e}") |
| |
| |
| def _list_wheel_contents(wheel_path: str) -> None: |
| """List contents of a wheel file.""" |
| print(f"\nContents of: {os.path.basename(wheel_path)}") |
| print("=" * 80) |
| |
| try: |
| with zipfile.ZipFile(wheel_path, "r") as whl: |
| file_list = whl.namelist() |
| |
| print(f"Total files: {len(file_list)}\n") |
| |
| # Group by directory |
| top_level_dirs = {} |
| for file in file_list: |
| top_dir = file.split("/")[0] |
| top_level_dirs[top_dir] = top_level_dirs.get(top_dir, 0) + 1 |
| |
| print("Top-level structure:") |
| for dir_name, count in sorted(top_level_dirs.items()): |
| print(f" {dir_name:<50} ({count} files)") |
| |
| # Show all files |
| print("\nFiles:\n") |
| |
| for filename in sorted(file_list): |
| info = whl.getinfo(filename) |
| size = f"{info.file_size:>12,}" if not filename.endswith("/") else " <dir>" |
| print(f" {filename:<70} {size}") |
| |
| except Exception as e: |
| print(f"Error reading wheel: {e}") |
| |
| |
| def list_contents(artifact_path: str) -> None: |
| """List contents of a specific artifact.""" |
| _print_section("Listing Artifact Contents") |
| |
| if not os.path.exists(artifact_path): |
| _fail(f"Artifact not found: {artifact_path}") |
| |
| if artifact_path.endswith(".tar.gz"): |
| _list_tar_gz_contents(artifact_path) |
| elif artifact_path.endswith(".whl"): |
| _list_wheel_contents(artifact_path) |
| else: |
| _fail(f"Unsupported file type: {artifact_path}\nSupported: .tar.gz, .whl") |
| |
| |
| # ============================================================================ |
| # Command Handlers |
| # ============================================================================ |
| |
| |
| def cmd_signatures(args) -> bool: |
| """Verify signatures and checksums.""" |
| return verify_signatures(args.artifacts_dir) |
| |
| |
| def cmd_licenses(args) -> bool: |
| """Verify licenses with Apache RAT.""" |
| if not args.rat_jar: |
| _fail("--rat-jar is required for license verification") |
| |
| return verify_licenses(args.artifacts_dir, args.rat_jar, args.report_only) |
| |
| |
| def cmd_all(args) -> bool: |
| """Verify everything: signatures, checksums, and licenses.""" |
| _print_section("Complete Apache Artifacts Verification") |
| |
| # Step 1: Verify signatures |
| print("\n[1/2] Verifying signatures and checksums...") |
| signatures_ok = verify_signatures(args.artifacts_dir) |
| |
| # Step 2: Verify licenses |
| if args.rat_jar: |
| print("\n[2/2] Verifying licenses with Apache RAT...") |
| licenses_ok = verify_licenses(args.artifacts_dir, args.rat_jar, args.report_only) |
| else: |
| print("\n[2/2] Skipping license verification (no --rat-jar provided)") |
| licenses_ok = True |
| |
| # Summary |
| _print_section("Verification Summary") |
| |
| print("Results:") |
| print(f" Signatures & Checksums: {'✅ PASS' if signatures_ok else '❌ FAIL'}") |
| print( |
| f" License Compliance: {'✅ PASS' if licenses_ok else '❌ FAIL' if args.rat_jar else '⊘ SKIPPED'}" |
| ) |
| |
| return signatures_ok and licenses_ok |
| |
| |
| def cmd_list_contents(args) -> None: |
| """List contents of a specific artifact.""" |
| list_contents(args.artifact) |
| |
| |
| # ============================================================================ |
| # CLI Entry Point |
| # ============================================================================ |
| |
| |
| def main(): |
| """Main entry point.""" |
| parser = argparse.ArgumentParser( |
| description="Apache Artifacts Verification Tool", |
| formatter_class=argparse.RawDescriptionHelpFormatter, |
| epilog=""" |
| Examples: |
| # List contents of a specific artifact |
| python scripts/verify_apache_artifacts.py list-contents dist/apache-burr-0.41.0.tar.gz |
| python scripts/verify_apache_artifacts.py list-contents dist/apache_burr-0.41.0-py3-none-any.whl |
| |
| # Verify signatures and checksums only |
| python scripts/verify_apache_artifacts.py signatures |
| |
| # Verify licenses with Apache RAT |
| python scripts/verify_apache_artifacts.py licenses --rat-jar /path/to/apache-rat.jar |
| |
| # Verify everything |
| python scripts/verify_apache_artifacts.py all --rat-jar /path/to/apache-rat.jar |
| |
| # Report-only mode (don't fail on license issues) |
| python scripts/verify_apache_artifacts.py licenses --rat-jar /path/to/apache-rat.jar --report-only |
| |
| # Custom artifacts directory |
| python scripts/verify_apache_artifacts.py all --artifacts-dir /path/to/artifacts --rat-jar /path/to/rat.jar |
| """, |
| ) |
| |
| subparsers = parser.add_subparsers(dest="command", required=True) |
| |
| # list-contents subcommand |
| list_parser = subparsers.add_parser( |
| "list-contents", help="List contents of a specific artifact" |
| ) |
| list_parser.add_argument("artifact", help="Path to artifact file (.tar.gz or .whl)") |
| |
| # signatures subcommand |
| sig_parser = subparsers.add_parser( |
| "signatures", help="Verify GPG signatures and SHA512 checksums" |
| ) |
| sig_parser.add_argument( |
| "--artifacts-dir", default="dist", help="Directory containing artifacts (default: dist)" |
| ) |
| |
| # licenses subcommand |
| lic_parser = subparsers.add_parser("licenses", help="Verify licenses with Apache RAT") |
| lic_parser.add_argument( |
| "--artifacts-dir", default="dist", help="Directory containing artifacts (default: dist)" |
| ) |
| lic_parser.add_argument("--rat-jar", required=True, help="Path to Apache RAT JAR file") |
| lic_parser.add_argument( |
| "--report-only", action="store_true", help="Generate report but don't fail on issues" |
| ) |
| |
| # all subcommand |
| all_parser = subparsers.add_parser("all", help="Verify everything (signatures + licenses)") |
| all_parser.add_argument( |
| "--artifacts-dir", default="dist", help="Directory containing artifacts (default: dist)" |
| ) |
| all_parser.add_argument( |
| "--rat-jar", help="Path to Apache RAT JAR file (optional for signatures-only)" |
| ) |
| all_parser.add_argument( |
| "--report-only", |
| action="store_true", |
| help="Generate report but don't fail on license issues", |
| ) |
| |
| args = parser.parse_args() |
| |
| # Dispatch to command handler |
| success = False |
| try: |
| if args.command == "list-contents": |
| cmd_list_contents(args) |
| sys.exit(0) |
| elif args.command == "signatures": |
| success = cmd_signatures(args) |
| elif args.command == "licenses": |
| success = cmd_licenses(args) |
| elif args.command == "all": |
| success = cmd_all(args) |
| else: |
| _fail(f"Unknown command: {args.command}") |
| except KeyboardInterrupt: |
| print("\n\n⚠️ Interrupted by user") |
| sys.exit(130) |
| except Exception as e: |
| print(f"\n❌ Unexpected error: {e}") |
| import traceback |
| |
| traceback.print_exc() |
| sys.exit(1) |
| |
| if success: |
| print("\n✅ Verification completed successfully!") |
| sys.exit(0) |
| else: |
| print("\n❌ Verification failed.") |
| sys.exit(1) |
| |
| |
| if __name__ == "__main__": |
| main() |