| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| # |
| # PEP 723 compliant inline script metadata (not yet widely supported) |
| # /// script |
| # requires-python = ">=3.10" |
| # dependencies = [ |
| # "apache-airflow-client", |
| # "rich>=13.6.0", |
| # ] |
| # /// |
| |
| from __future__ import annotations |
| |
| import sys |
| import time |
| import uuid |
| |
| import airflow_client.client |
| import pytest |
| |
| from tests_common.test_utils.api_client_helpers import generate_access_token |
| |
| try: |
| # If you have rich installed, you will have nice colored output of the API responses |
| from rich import print |
| except ImportError: |
| print("Output will not be colored. Please install rich to get colored output: `pip install rich`") |
| pass |
| from airflow_client.client.api import config_api, dag_api, dag_run_api, task_api |
| from airflow_client.client.models.trigger_dag_run_post_body import TriggerDAGRunPostBody |
| |
| # The client must use the authentication and authorization parameters |
| # in accordance with the API server security policy. |
| # Examples for each auth method are provided below, use the example that |
| # satisfies your auth use case. |
| # |
| # The example below use the default FabAuthManager, in case your airflow api server use a different |
| # auth manager for instance AwsAuthManagerUser or SimpleAuthManager make sure to generate the token with |
| # appropriate AuthManager. |
| # This is defined in the `[api]` section of your `airflow.cfg`: |
| # |
| # auth_manager = airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager |
| # |
| # Make sure that your user/name are configured properly - using the user/password that has admin |
| # privileges in Airflow |
| |
| # Used to initialize FAB and the auth manager, necessary for creating the token. |
| |
| |
| access_token = generate_access_token("admin", "admin", "localhost:8080") |
| configuration = airflow_client.client.Configuration(host="http://localhost:8080", access_token=access_token) |
| |
| # Make sure in the [core] section, the `load_examples` config is set to True in your airflow.cfg |
| # or AIRFLOW__CORE__LOAD_EXAMPLES environment variable set to True |
| DAG_ID = "example_simplest_dag" |
| |
| |
| # Enter a context with an instance of the API client |
| @pytest.mark.execution_timeout(400) |
| def test_python_client(): |
| with airflow_client.client.ApiClient(configuration) as api_client: |
| errors = False |
| |
| print("[blue]Getting DAG list") |
| max_retries = 10 |
| while max_retries > 0: |
| try: |
| dag_api_instance = dag_api.DAGApi(api_client) |
| api_response = dag_api_instance.get_dags() |
| except airflow_client.client.OpenApiException as e: |
| print(f"[red]Exception when calling DagAPI->get_dags: {e}\n") |
| errors = True |
| time.sleep(6) |
| max_retries -= 1 |
| else: |
| print("[green]Getting DAG list successful") |
| break |
| |
| print("[blue]Getting Tasks for a DAG") |
| try: |
| task_api_instance = task_api.TaskApi(api_client) |
| api_response = task_api_instance.get_tasks(DAG_ID) |
| print(api_response) |
| except airflow_client.client.exceptions.OpenApiException as e: |
| print(f"[red]Exception when calling DagAPI->get_tasks: {e}\n") |
| errors = True |
| else: |
| print("[green]Getting Tasks successful") |
| |
| print("[blue]Triggering a DAG run") |
| dag_run_api_instance = dag_run_api.DagRunApi(api_client) |
| try: |
| # Create a DAGRun object (no dag_id should be specified because it is read-only property of DAGRun) |
| # dag_run id is generated randomly to allow multiple executions of the script |
| dag_run = TriggerDAGRunPostBody( |
| dag_run_id="some_test_run_" + uuid.uuid4().hex, |
| logical_date=None, |
| ) |
| api_response = dag_run_api_instance.trigger_dag_run(DAG_ID, dag_run) |
| print(api_response) |
| except airflow_client.client.exceptions.OpenApiException as e: |
| print(f"[red]Exception when calling DAGRunAPI->post_dag_run: {e}\n") |
| errors = True |
| else: |
| print("[green]Posting DAG Run successful") |
| |
| # Get current configuration. Note, this is disabled by default with most installation. |
| # You need to set `expose_config = True` in Airflow configuration in order to retrieve configuration. |
| conf_api_instance = config_api.ConfigApi(api_client) |
| try: |
| api_response = conf_api_instance.get_config() |
| print(api_response) |
| except airflow_client.client.OpenApiException as e: |
| if "Your Airflow administrator chose" in str(e): |
| print( |
| "[yellow]You need to set `expose_config = True` in Airflow configuration" |
| " in order to retrieve configuration." |
| ) |
| print("[bright_blue]This is OK. Exposing config is disabled by default.") |
| else: |
| print(f"[red]Exception when calling DAGRunAPI->post_dag_run: {e}\n") |
| errors = True |
| else: |
| print("[green]Config retrieved successfully") |
| |
| if errors: |
| print("\n[red]There were errors while running the script - see above for details") |
| sys.exit(1) |
| else: |
| print("\n[green]Everything went well") |
| |
| |
| if __name__ == "__main__": |
| test_python_client() |