| #!/usr/bin/env python3 |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # /// script |
| # requires-python = ">=3.10" |
| # dependencies = [ |
| # "rich>=13.6.0", |
| # "PyYAML>=6.0", |
| # ] |
| # /// |
| """Generate mprocs configuration dynamically based on environment variables.""" |
| |
| from __future__ import annotations |
| |
| import os |
| import sys |
| import tempfile |
| |
| import yaml |
| from rich.console import Console |
| from rich.panel import Panel |
| from rich.syntax import Syntax |
| |
| |
| def get_env_bool(var_name: str, default: str = "false") -> bool: |
| """Get environment variable as boolean.""" |
| return os.environ.get(var_name, default).lower() == "true" |
| |
| |
| def get_env(var_name: str, default: str = "") -> str: |
| """Get environment variable with default.""" |
| return os.environ.get(var_name, default) |
| |
| |
| def generate_mprocs_config() -> str: |
| """Generate mprocs YAML configuration based on environment variables.""" |
| procs = {} |
| |
| # Scheduler |
| scheduler_cmd = "airflow scheduler" |
| if get_env_bool("BREEZE_DEBUG_SCHEDULER"): |
| port = get_env("BREEZE_DEBUG_SCHEDULER_PORT", "5678") |
| scheduler_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow scheduler" |
| |
| procs["scheduler"] = { |
| "shell": scheduler_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| # API Server or Webserver (depending on Airflow version) |
| use_airflow_version = get_env("USE_AIRFLOW_VERSION", "") |
| if not use_airflow_version.startswith("2."): |
| # API Server (Airflow 3.x+) |
| # Bind dual-stack (IPv6 + IPv4) so http://localhost:28080 works from browsers that |
| # resolve `localhost` to ::1 first (e.g. Chrome/Safari on macOS with OrbStack, which |
| # forwards both IPv4 and IPv6 host ports into the container). The default `0.0.0.0` |
| # binds IPv4 only, causing the IPv6 attempt to land inside the container with no |
| # listener and TCP RST → opaque chrome-error://chromewebdata/ page. |
| |
| # If host is empty or None, uvicorn listens on all available (IPv4 and IPv6) interfaces. |
| api_host_arg = "--host ''" |
| if get_env_bool("BREEZE_DEBUG_APISERVER"): |
| port = get_env("BREEZE_DEBUG_APISERVER_PORT", "5679") |
| api_cmd = ( |
| f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow api-server {api_host_arg} -d" |
| ) |
| else: |
| dev_mode = get_env_bool("DEV_MODE") |
| api_cmd = ( |
| f"airflow api-server {api_host_arg} -d" if dev_mode else f"airflow api-server {api_host_arg}" |
| ) |
| procs["api_server"] = {"shell": api_cmd, "restart": "always", "scrollback": 100000} |
| else: |
| # Webserver (Airflow 2.x) |
| if get_env_bool("BREEZE_DEBUG_WEBSERVER"): |
| port = get_env("BREEZE_DEBUG_WEBSERVER_PORT", "5680") |
| web_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow webserver" |
| else: |
| dev_mode = get_env_bool("DEV_MODE") |
| web_cmd = "airflow webserver -d" if dev_mode else "airflow webserver" |
| |
| procs["webserver"] = { |
| "shell": web_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| # Triggerer |
| triggerer_cmd = "airflow triggerer" |
| if get_env_bool("BREEZE_DEBUG_TRIGGERER"): |
| port = get_env("BREEZE_DEBUG_TRIGGERER_PORT", "5681") |
| triggerer_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow triggerer" |
| |
| procs["triggerer"] = { |
| "shell": triggerer_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| # Celery Worker (conditional) |
| if get_env_bool("INTEGRATION_CELERY"): |
| if get_env_bool("BREEZE_DEBUG_CELERY_WORKER"): |
| port = get_env("BREEZE_DEBUG_CELERY_WORKER_PORT", "5682") |
| celery_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow celery worker" |
| else: |
| celery_cmd = "airflow celery worker" |
| |
| procs["celery_worker"] = { |
| "shell": celery_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| if get_env("GO_WORKER") != "": |
| env = {} |
| env["AIRFLOW__EDGE__API_URL"] = "http://localhost:8080" |
| env["AIRFLOW__BUNDLES__FOLDER"] = "./bin" |
| env["AIRFLOW__LOGGING__BASE_LOG_FOLDER"] = "./logs" |
| env["AIRFLOW__EXECUTION__API_URL"] = "http://localhost:8080/execution/" |
| |
| # Build command with environment cleanup |
| go_worker_cmd_parts = [ |
| "export AIRFLOW__API_AUTH__SECRET_KEY=${AIRFLOW__API_AUTH__JWT_SECRET}", |
| "export AIRFLOW__LOGGING__SECRET_KEY=${AIRFLOW__API_AUTH__JWT_SECRET}", |
| "unset AIRFLOW__API_AUTH__JWT_SECRET || true", |
| "unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN || true", |
| "unset AIRFLOW__CELERY__RESULT_BACKEND || true", |
| "unset POSTGRES_HOST_PORT || true", |
| "unset BACKEND || true", |
| "unset POSTGRES_VERSION || true", |
| "export AIRFLOW__LOGGING__BASE_LOG_FOLDER=edge_logs", |
| "go run ./cmd/airflow-go-edge-worker/main.go run", |
| ] |
| go_worker_cmd = " && ".join(go_worker_cmd_parts) |
| |
| procs["go_worker"] = { |
| "shell": go_worker_cmd, |
| "cwd": "go-sdk", |
| "env": env, |
| } |
| |
| # Flower (conditional) |
| if get_env_bool("INTEGRATION_CELERY") and get_env_bool("CELERY_FLOWER"): |
| if get_env_bool("BREEZE_DEBUG_FLOWER"): |
| port = get_env("BREEZE_DEBUG_FLOWER_PORT", "5683") |
| flower_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow celery flower" |
| else: |
| flower_cmd = "airflow celery flower" |
| |
| procs["flower"] = { |
| "shell": flower_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| # Edge Worker (conditional) |
| executor = get_env("AIRFLOW__CORE__EXECUTOR", "") |
| if executor == "airflow.providers.edge3.executors.edge_executor.EdgeExecutor": |
| if get_env_bool("BREEZE_DEBUG_EDGE"): |
| port = get_env("BREEZE_DEBUG_EDGE_PORT", "5684") |
| edge_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow edge worker --edge-hostname breeze --queues default" |
| else: |
| # Build command with environment cleanup |
| edge_cmd_parts = [ |
| "unset AIRFLOW__DATABASE__SQL_ALCHEMY_CONN || true", |
| "unset AIRFLOW__CELERY__RESULT_BACKEND || true", |
| "unset POSTGRES_HOST_PORT || true", |
| "unset BACKEND || true", |
| "unset POSTGRES_VERSION || true", |
| "export AIRFLOW__LOGGING__BASE_LOG_FOLDER=edge_logs", |
| "airflow edge worker --edge-hostname breeze --queues default", |
| ] |
| edge_cmd = " && ".join(edge_cmd_parts) |
| |
| procs["edge_worker"] = {"shell": edge_cmd, "restart": "always", "scrollback": 100000} |
| |
| # Dag Processor (conditional) |
| if get_env_bool("STANDALONE_DAG_PROCESSOR"): |
| if get_env_bool("BREEZE_DEBUG_DAG_PROCESSOR"): |
| port = get_env("BREEZE_DEBUG_DAG_PROCESSOR_PORT", "5685") |
| dag_proc_cmd = f"debugpy --listen 0.0.0.0:{port} --wait-for-client -m airflow dag-processor" |
| else: |
| dag_proc_cmd = "airflow dag-processor" |
| |
| procs["dag_processor"] = { |
| "shell": dag_proc_cmd, |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| procs["shell"] = { |
| "shell": "bash", |
| "restart": "always", |
| "scrollback": 100000, |
| } |
| |
| # Generate YAML output |
| config_dict = {"procs": procs} |
| return yaml.dump(config_dict, default_flow_style=False, sort_keys=False) |
| |
| |
| def main(): |
| # Set LocalExecutor if not set and backend is not sqlite |
| backend = get_env("BACKEND", "") |
| if backend != "sqlite" and not get_env("AIRFLOW__CORE__EXECUTOR"): |
| os.environ["AIRFLOW__CORE__EXECUTOR"] = "LocalExecutor" |
| |
| # Generate and print configuration |
| config = generate_mprocs_config() |
| |
| # Determine output path |
| if len(sys.argv) > 1: |
| output_path = sys.argv[1] |
| else: |
| temp_dir = tempfile.gettempdir() |
| output_path = os.path.join(temp_dir, "mprocs.yaml") |
| |
| with open(output_path, "w") as f: |
| f.write(config) |
| |
| if os.environ.get("VERBOSE", "false") == "true": |
| # Use rich console for pretty output |
| console = Console() |
| |
| console.print( |
| f"\n[bold green]✓[/bold green] Generated mprocs configuration at: [cyan]{output_path}[/cyan]" |
| ) |
| |
| # Display configuration with syntax highlighting |
| syntax = Syntax(config, "yaml", theme="monokai", line_numbers=False) |
| panel = Panel( |
| syntax, |
| title="[bold yellow]Configuration Preview[/bold yellow]", |
| border_style="blue", |
| expand=False, |
| ) |
| console.print(panel) |
| |
| |
| if __name__ == "__main__": |
| main() |