| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import annotations |
| |
| from typing import Callable |
| |
| from airflow.decorators.base import DecoratedOperator, TaskDecorator, task_decorator_factory |
| from airflow.operators.python import ExternalPythonOperator |
| |
| |
| class _PythonExternalDecoratedOperator(DecoratedOperator, ExternalPythonOperator): |
| """ |
| Wraps a Python callable and captures args/kwargs when called for execution. |
| |
| :param python: Full path string (file-system specific) that points to a Python binary inside |
| a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path |
| (so usually start with "/" or "X:/" depending on the filesystem/os used). |
| :param python_callable: A reference to an object that is callable |
| :param op_kwargs: a dictionary of keyword arguments that will get unpacked |
| in your function (templated) |
| :param op_args: a list of positional arguments that will get unpacked when |
| calling your callable (templated) |
| :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to |
| multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. Defaults to False. |
| """ |
| |
| custom_operator_name: str = "@task.external_python" |
| |
| def __init__(self, *, python_callable, op_args, op_kwargs, **kwargs) -> None: |
| kwargs_to_upstream = { |
| "python_callable": python_callable, |
| "op_args": op_args, |
| "op_kwargs": op_kwargs, |
| } |
| super().__init__( |
| kwargs_to_upstream=kwargs_to_upstream, |
| python_callable=python_callable, |
| op_args=op_args, |
| op_kwargs=op_kwargs, |
| **kwargs, |
| ) |
| |
| |
| def external_python_task( |
| python: str | None = None, |
| python_callable: Callable | None = None, |
| multiple_outputs: bool | None = None, |
| **kwargs, |
| ) -> TaskDecorator: |
| """Wraps a callable into an Airflow operator to run via a Python virtual environment. |
| |
| Accepts kwargs for operator kwarg. Can be reused in a single DAG. |
| |
| This function is only used during type checking or auto-completion. |
| |
| :meta private: |
| |
| :param python: Full path string (file-system specific) that points to a Python binary inside |
| a virtualenv that should be used (in ``VENV/bin`` folder). Should be absolute path |
| (so usually start with "/" or "X:/" depending on the filesystem/os used). |
| :param python_callable: Function to decorate |
| :param multiple_outputs: If set to True, the decorated function's return value will be unrolled to |
| multiple XCom values. Dict will unroll to XCom values with its keys as XCom keys. |
| Defaults to False. |
| """ |
| return task_decorator_factory( |
| python=python, |
| python_callable=python_callable, |
| multiple_outputs=multiple_outputs, |
| decorated_operator_class=_PythonExternalDecoratedOperator, |
| **kwargs, |
| ) |