| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """CLI entry point for the Fory IDL compiler.""" |
| |
| import argparse |
| import copy |
| import os |
| import sys |
| from pathlib import Path |
| from typing import Dict, List, Optional, Set, Tuple |
| |
| from fory_compiler.frontend.base import FrontendError |
| from fory_compiler.frontend.utils import parse_idl_file, resolve_import_path |
| from fory_compiler.ir.ast import Schema |
| from fory_compiler.ir.emitter import FDLEmitter |
| from fory_compiler.ir.validator import SchemaValidator |
| from fory_compiler.generators.base import GeneratorOptions |
| from fory_compiler.generators import GENERATORS |
| |
| |
| class ImportError(Exception): |
| """Error during import resolution.""" |
| |
| pass |
| |
| |
| GENERATED_MARKER = "This file is generated by Apache Fory compiler." |
| SKIP_DIR_NAMES = {"build", "target"} |
| |
| |
| def should_skip_dir(name: str) -> bool: |
| """Return True if directory should be skipped during scans.""" |
| return name in SKIP_DIR_NAMES or name.startswith(".") |
| |
| |
| def is_generated_file(path: Path) -> bool: |
| """Return True if file appears to be generated by Fory compiler.""" |
| try: |
| content = path.read_text(errors="ignore") |
| except OSError: |
| return False |
| return GENERATED_MARKER in content |
| |
| |
| def scan_generated_files(root: Path, relative: bool) -> List[Path]: |
| """Scan for generated files under root.""" |
| matches: List[Path] = [] |
| for dirpath, dirnames, filenames in os.walk(root): |
| dirnames[:] = [d for d in dirnames if not should_skip_dir(d)] |
| for filename in filenames: |
| file_path = Path(dirpath) / filename |
| if not file_path.is_file(): |
| continue |
| if is_generated_file(file_path): |
| matches.append(file_path) |
| display_path = file_path |
| if relative: |
| try: |
| display_path = file_path.relative_to(root) |
| except ValueError: |
| pass |
| print(f" Generated: {display_path}") |
| return matches |
| |
| |
| def resolve_imports( |
| file_path: Path, |
| import_paths: Optional[List[Path]] = None, |
| visited: Optional[Set[Path]] = None, |
| cache: Optional[Dict[Path, Schema]] = None, |
| ) -> Schema: |
| """ |
| Recursively resolve imports and merge all types into a single schema. |
| |
| Args: |
| file_path: Path to the FDL file to parse |
| import_paths: List of directories to search for imports |
| visited: Set of already visited files (for cycle detection) |
| cache: Cache of already parsed schemas |
| |
| Returns: |
| Schema with all imported types merged in |
| """ |
| if import_paths is None: |
| import_paths = [] |
| if visited is None: |
| visited = set() |
| if cache is None: |
| cache = {} |
| |
| # Normalize path |
| file_path = file_path.resolve() |
| |
| # Check for circular imports |
| if file_path in visited: |
| raise ImportError(f"Circular import detected: {file_path}") |
| |
| # Return cached schema if available |
| if file_path in cache: |
| return copy.deepcopy(cache[file_path]) |
| |
| visited.add(file_path) |
| |
| # Parse the file |
| schema = parse_idl_file(file_path) |
| |
| # Process imports |
| imported_enums = [] |
| imported_messages = [] |
| imported_unions = [] |
| |
| for imp in schema.imports: |
| # Resolve import path using search paths |
| import_path = resolve_import_path(imp.path, file_path, import_paths) |
| |
| if import_path is None: |
| # Build helpful error message with search locations |
| searched = [str(file_path.parent)] |
| searched.extend(str(p) for p in import_paths) |
| line = imp.location.line if imp.location else imp.line |
| column = imp.location.column if imp.location else imp.column |
| raise ImportError( |
| f"Import not found: {imp.path}\n" |
| f" at line {line}, column {column}\n" |
| f" Searched in: {', '.join(searched)}" |
| ) |
| |
| # Recursively resolve the imported file |
| imported_schema = resolve_imports( |
| import_path, import_paths, visited.copy(), cache |
| ) |
| |
| # Collect types from imported schema |
| imported_enums.extend(imported_schema.enums) |
| imported_messages.extend(imported_schema.messages) |
| imported_unions.extend(imported_schema.unions) |
| |
| # Create merged schema with imported types first (so they can be referenced) |
| merged_schema = Schema( |
| package=schema.package, |
| imports=schema.imports, |
| enums=imported_enums + schema.enums, |
| messages=imported_messages + schema.messages, |
| unions=imported_unions + schema.unions, |
| options=schema.options, |
| source_file=schema.source_file, |
| source_format=schema.source_format, |
| ) |
| |
| cache[file_path] = copy.deepcopy(merged_schema) |
| return merged_schema |
| |
| |
| def go_package_info(schema: Schema) -> Tuple[Optional[str], str]: |
| """Return (import_path, package_name) for Go.""" |
| go_package = schema.get_option("go_package") |
| if go_package: |
| if ";" in go_package: |
| import_path, package_name = go_package.split(";", 1) |
| return import_path, package_name |
| import_path = go_package |
| package_name = import_path.rstrip("/").split("/")[-1] |
| return import_path, package_name |
| if schema.package: |
| return None, schema.package.split(".")[-1] |
| return None, "" |
| |
| |
| def resolve_go_module_root(base_go_out: Path, schema: Schema) -> Path: |
| """Infer the Go module root for output layout.""" |
| _, package_name = go_package_info(schema) |
| if package_name and base_go_out.name == package_name: |
| return base_go_out.parent |
| return base_go_out |
| |
| |
| def resolve_go_output_dir(base_go_out: Path, schema: Schema) -> Path: |
| """Resolve Go output dir for a schema based on go_package/package.""" |
| import_path, package_name = go_package_info(schema) |
| if package_name and base_go_out.name == package_name: |
| return base_go_out |
| if import_path: |
| last_segment = import_path.rstrip("/").split("/")[-1] |
| if package_name and last_segment == package_name: |
| return base_go_out / package_name |
| return base_go_out |
| if package_name: |
| return base_go_out / package_name |
| return base_go_out |
| |
| |
| def parse_args(args: Optional[List[str]] = None) -> argparse.Namespace: |
| """Parse command line arguments.""" |
| parser = argparse.ArgumentParser( |
| prog="foryc", |
| description="Fory IDL compiler", |
| ) |
| |
| parser.add_argument( |
| "--scan-generated", |
| action="store_true", |
| help="Scan for generated files under the current directory", |
| ) |
| |
| parser.add_argument( |
| "files", |
| nargs="*", |
| type=Path, |
| metavar="FILE", |
| help="IDL files to compile", |
| ) |
| |
| parser.add_argument( |
| "--lang", |
| type=str, |
| default="all", |
| help="Comma-separated list of target languages (java,python,cpp,rust,go). Default: all", |
| ) |
| |
| parser.add_argument( |
| "--output", |
| "-o", |
| type=Path, |
| default=Path("./generated"), |
| help="Output directory. Default: ./generated", |
| ) |
| |
| parser.add_argument( |
| "--package", |
| type=str, |
| default=None, |
| help="Override package name from FDL file", |
| ) |
| |
| parser.add_argument( |
| "-I", |
| "--proto_path", |
| "--import_path", |
| dest="import_paths", |
| action="append", |
| type=Path, |
| default=[], |
| metavar="PATH", |
| help="Add a directory to the import search path. Can be specified multiple times.", |
| ) |
| |
| # Language-specific output directories (protoc-style) |
| parser.add_argument( |
| "--java_out", |
| type=Path, |
| default=None, |
| metavar="DST_DIR", |
| help="Generate Java code in DST_DIR", |
| ) |
| |
| parser.add_argument( |
| "--python_out", |
| type=Path, |
| default=None, |
| metavar="DST_DIR", |
| help="Generate Python code in DST_DIR", |
| ) |
| |
| parser.add_argument( |
| "--cpp_out", |
| type=Path, |
| default=None, |
| metavar="DST_DIR", |
| help="Generate C++ code in DST_DIR", |
| ) |
| |
| parser.add_argument( |
| "--go_out", |
| type=Path, |
| default=None, |
| metavar="DST_DIR", |
| help="Generate Go code in DST_DIR", |
| ) |
| |
| parser.add_argument( |
| "--rust_out", |
| type=Path, |
| default=None, |
| metavar="DST_DIR", |
| help="Generate Rust code in DST_DIR", |
| ) |
| |
| parser.add_argument( |
| "--go_nested_type_style", |
| type=str, |
| default=None, |
| choices=["camelcase", "underscore"], |
| help="Go nested type naming style: camelcase or underscore (default)", |
| ) |
| |
| parser.add_argument( |
| "--emit-fdl", |
| action="store_true", |
| help="Emit translated FDL (for non-FDL inputs) for debugging", |
| ) |
| |
| parser.add_argument( |
| "--emit-fdl-path", |
| type=Path, |
| default=None, |
| help="Write translated FDL to this path (file or directory)", |
| ) |
| |
| parser.add_argument( |
| "--root", |
| type=Path, |
| default=Path("."), |
| help="Root directory to scan (default: current directory)", |
| ) |
| parser.add_argument( |
| "--relative", |
| action="store_true", |
| help="Print paths relative to the scan root", |
| ) |
| delete_group = parser.add_mutually_exclusive_group() |
| delete_group.add_argument( |
| "--delete", |
| action="store_true", |
| help="Delete all scanned generated files", |
| ) |
| delete_group.add_argument( |
| "--dry-run", |
| action="store_true", |
| help="Scan and print without deleting files", |
| ) |
| |
| return parser.parse_args(args) |
| |
| |
| def normalize_args(args: Optional[List[str]]) -> List[str]: |
| """Normalize args so compile is the default command.""" |
| if args is None: |
| args = sys.argv[1:] |
| if not args: |
| return [] |
| if args[0] in {"-h", "--help"}: |
| return args |
| if args[0] == "--scan-generated": |
| return args |
| if args[0] == "scan-generated": |
| return ["--scan-generated", *args[1:]] |
| if args[0] == "compile": |
| return args[1:] |
| return args |
| |
| |
| def get_languages(lang_arg: str) -> List[str]: |
| """Parse the language argument into a list of languages.""" |
| if lang_arg == "all": |
| return list(GENERATORS.keys()) |
| |
| languages = [lang.strip().lower() for lang in lang_arg.split(",")] |
| |
| # Validate languages |
| invalid = [lang for lang in languages if lang not in GENERATORS] |
| if invalid: |
| print(f"Error: Unknown language(s): {', '.join(invalid)}", file=sys.stderr) |
| print(f"Available: {', '.join(GENERATORS.keys())}", file=sys.stderr) |
| sys.exit(1) |
| |
| return languages |
| |
| |
| def compile_file( |
| file_path: Path, |
| lang_output_dirs: Dict[str, Path], |
| package_override: Optional[str] = None, |
| import_paths: Optional[List[Path]] = None, |
| go_nested_type_style: Optional[str] = None, |
| emit_fdl: bool = False, |
| emit_fdl_path: Optional[Path] = None, |
| resolve_cache: Optional[Dict[Path, Schema]] = None, |
| ) -> bool: |
| """Compile a single IDL file with import resolution. |
| |
| Args: |
| file_path: Path to the IDL file |
| lang_output_dirs: Dictionary mapping language name to output directory |
| package_override: Optional package name override |
| import_paths: List of import search paths |
| """ |
| print(f"Compiling {file_path}...") |
| |
| # Parse and resolve imports |
| try: |
| schema = resolve_imports(file_path, import_paths, cache=resolve_cache) |
| except OSError as e: |
| print(f"Error reading {file_path}: {e}", file=sys.stderr) |
| return False |
| except (FrontendError, ValueError) as e: |
| print(f"Error: {e}", file=sys.stderr) |
| return False |
| except ImportError as e: |
| print(f"Import error: {e}", file=sys.stderr) |
| return False |
| |
| # Print import info |
| if schema.imports: |
| print(f" Resolved {len(schema.imports)} import(s)") |
| |
| if emit_fdl: |
| emitter = FDLEmitter(schema) |
| fdl_content = emitter.emit() |
| if emit_fdl_path: |
| target = emit_fdl_path |
| if target.exists() and target.is_dir(): |
| target = target / f"{file_path.stem}.fdl" |
| elif str(target).endswith("/") or str(target).endswith("\\"): |
| target.mkdir(parents=True, exist_ok=True) |
| target = target / f"{file_path.stem}.fdl" |
| target.parent.mkdir(parents=True, exist_ok=True) |
| target.write_text(fdl_content) |
| print(f" Emitted FDL: {target}") |
| else: |
| print("=== Translated FDL ===") |
| print(fdl_content.rstrip()) |
| print("======================") |
| |
| # Validate merged schema |
| validator = SchemaValidator(schema) |
| if not validator.validate(): |
| for error in validator.errors: |
| print(f"Error: {error}", file=sys.stderr) |
| return False |
| for warning in validator.warnings: |
| print(f"Warning: {warning}", file=sys.stderr) |
| |
| # Generate code for each language |
| for lang, lang_output in lang_output_dirs.items(): |
| options = GeneratorOptions( |
| output_dir=lang_output, |
| package_override=package_override, |
| go_nested_type_style=go_nested_type_style, |
| ) |
| |
| generator_class = GENERATORS[lang] |
| generator = generator_class(schema, options) |
| files = generator.generate() |
| generator.write_files(files) |
| |
| for f in files: |
| print(f" Generated: {lang_output / f.path}") |
| |
| return True |
| |
| |
| def compile_file_recursive( |
| file_path: Path, |
| lang_output_dirs: Dict[str, Path], |
| package_override: Optional[str], |
| import_paths: List[Path], |
| go_nested_type_style: Optional[str], |
| emit_fdl: bool, |
| emit_fdl_path: Optional[Path], |
| generated: Set[Path], |
| stack: Set[Path], |
| resolve_cache: Dict[Path, Schema], |
| go_module_root: Optional[Path], |
| ) -> bool: |
| file_path = file_path.resolve() |
| if file_path in generated: |
| return True |
| if file_path in stack: |
| raise ImportError(f"Circular import detected: {file_path}") |
| |
| stack.add(file_path) |
| try: |
| schema = parse_idl_file(file_path) |
| except OSError as e: |
| print(f"Error reading {file_path}: {e}", file=sys.stderr) |
| stack.remove(file_path) |
| return False |
| except (FrontendError, ValueError) as e: |
| print(f"Error: {e}", file=sys.stderr) |
| stack.remove(file_path) |
| return False |
| |
| if "go" in lang_output_dirs and go_module_root is None: |
| go_module_root = resolve_go_module_root(lang_output_dirs["go"], schema) |
| |
| effective_outputs = lang_output_dirs |
| if "go" in lang_output_dirs: |
| go_root = go_module_root or lang_output_dirs["go"] |
| go_out = resolve_go_output_dir(go_root, schema) |
| if go_out != lang_output_dirs["go"]: |
| effective_outputs = dict(lang_output_dirs) |
| effective_outputs["go"] = go_out |
| |
| for imp in schema.imports: |
| import_path = resolve_import_path(imp.path, file_path, import_paths) |
| if import_path is None: |
| searched = [str(file_path.parent)] |
| searched.extend(str(p) for p in import_paths) |
| line = imp.location.line if imp.location else imp.line |
| column = imp.location.column if imp.location else imp.column |
| print( |
| f"Import error: Import not found: {imp.path}\n" |
| f" at line {line}, column {column}\n" |
| f" Searched in: {', '.join(searched)}", |
| file=sys.stderr, |
| ) |
| stack.remove(file_path) |
| return False |
| if not compile_file_recursive( |
| import_path, |
| lang_output_dirs, |
| None, |
| import_paths, |
| go_nested_type_style, |
| emit_fdl, |
| emit_fdl_path, |
| generated, |
| stack, |
| resolve_cache, |
| go_module_root, |
| ): |
| stack.remove(file_path) |
| return False |
| |
| stack.remove(file_path) |
| ok = compile_file( |
| file_path, |
| effective_outputs, |
| package_override, |
| import_paths, |
| go_nested_type_style, |
| emit_fdl, |
| emit_fdl_path, |
| resolve_cache, |
| ) |
| if ok: |
| generated.add(file_path) |
| return ok |
| |
| |
| def cmd_compile(args: argparse.Namespace) -> int: |
| """Handle the compile command.""" |
| # Build language -> output directory mapping |
| # Language-specific --{lang}_out options take precedence |
| lang_specific_outputs = { |
| "java": args.java_out, |
| "python": args.python_out, |
| "cpp": args.cpp_out, |
| "go": args.go_out, |
| "rust": args.rust_out, |
| } |
| |
| # Determine which languages to generate |
| lang_output_dirs: Dict[str, Path] = {} |
| |
| # First, add languages specified via --{lang}_out (these use direct paths) |
| for lang, out_dir in lang_specific_outputs.items(): |
| if out_dir is not None: |
| lang_output_dirs[lang] = out_dir |
| |
| # Then, add languages from --lang that don't have specific output dirs |
| # These use output_dir/lang pattern |
| if args.lang != "all" or not lang_output_dirs: |
| # Only use --lang if no language-specific outputs are set, or if --lang is explicit |
| languages_from_arg = get_languages(args.lang) |
| for lang in languages_from_arg: |
| if lang not in lang_output_dirs: |
| lang_output_dirs[lang] = args.output / lang |
| |
| if not lang_output_dirs: |
| print("Error: No target languages specified.", file=sys.stderr) |
| print("Use --lang or --{lang}_out options.", file=sys.stderr) |
| return 1 |
| |
| # Validate that all languages are supported |
| invalid = [lang for lang in lang_output_dirs.keys() if lang not in GENERATORS] |
| if invalid: |
| print(f"Error: Unknown language(s): {', '.join(invalid)}", file=sys.stderr) |
| print(f"Available: {', '.join(GENERATORS.keys())}", file=sys.stderr) |
| return 1 |
| |
| # Resolve and validate import paths (support comma-separated paths) |
| import_paths = [] |
| for p in args.import_paths: |
| # Split by comma to support multiple paths in one option |
| for part in str(p).split(","): |
| part = part.strip() |
| if not part: |
| continue |
| resolved = Path(part).resolve() |
| if not resolved.is_dir(): |
| print( |
| f"Warning: Import path is not a directory: {part}", file=sys.stderr |
| ) |
| import_paths.append(resolved) |
| |
| # Create output directories |
| for out_dir in lang_output_dirs.values(): |
| out_dir.mkdir(parents=True, exist_ok=True) |
| |
| success = True |
| generated: Set[Path] = set() |
| resolve_cache: Dict[Path, Schema] = {} |
| for file_path in args.files: |
| if not file_path.exists(): |
| print(f"Error: File not found: {file_path}", file=sys.stderr) |
| success = False |
| continue |
| |
| try: |
| if not compile_file_recursive( |
| file_path, |
| lang_output_dirs, |
| args.package, |
| import_paths, |
| args.go_nested_type_style, |
| args.emit_fdl, |
| args.emit_fdl_path, |
| generated, |
| set(), |
| resolve_cache, |
| None, |
| ): |
| success = False |
| except ImportError as e: |
| print(f"Import error: {e}", file=sys.stderr) |
| success = False |
| |
| return 0 if success else 1 |
| |
| |
| def cmd_scan_generated(args: argparse.Namespace) -> int: |
| """Handle the scan-generated command.""" |
| root = args.root.resolve() |
| if not root.exists(): |
| print(f"Error: Root not found: {root}", file=sys.stderr) |
| return 1 |
| if not root.is_dir(): |
| print(f"Error: Root is not a directory: {root}", file=sys.stderr) |
| return 1 |
| |
| generated_files = scan_generated_files(root, args.relative) |
| if args.delete: |
| for file_path in generated_files: |
| try: |
| file_path.unlink() |
| except OSError as e: |
| print(f"Error deleting {file_path}: {e}", file=sys.stderr) |
| return 1 |
| return 0 |
| |
| |
| def main(args: Optional[List[str]] = None) -> int: |
| """Main entry point.""" |
| parsed = parse_args(normalize_args(args)) |
| |
| if parsed.scan_generated: |
| return cmd_scan_generated(parsed) |
| |
| if not parsed.files: |
| print("Usage: foryc [--scan-generated] [options] FILES...", file=sys.stderr) |
| print("Use 'foryc --help' for more information", file=sys.stderr) |
| return 1 |
| |
| return cmd_compile(parsed) |
| |
| |
| if __name__ == "__main__": |
| sys.exit(main()) |