Accepted
Airflow's current execution model is Python-only: DAGs are Python files, tasks are Python callables, and the supervisor communicates with the forked task process via UNIX domain socketpairs. To support tasks authored in other languages (starting with Java), we need an architecture that:
@task.stub.The only missing piece is a way for the task runner to hand off execution to a foreign-language process and still drive the same API-call lifecycle.
There are two ways to author a Java task, both producing a task class the SDK runtime can discover and execute.
Implement the Task interface with execute(Context context, Client client). Context provides static run-time data (logical date, run ID, etc.), and Client provides access to Airflow services (connections, variables, XCom):
public static class Extract implements Task { public void execute(Context context, Client client) throws Exception { var connection = client.getConnection("test_http"); client.setXCom(new Date().getTime()); } } public static class Transform implements Task { public void execute(Context context, Client client) { var extractXcom = client.getXCom("extract"); client.setXCom(new Date().getTime()); } } public static Dag build() { var dag = new Dag("java_interface_example"); dag.addTask("extract", Extract.class); dag.addTask("transform", Transform.class); return dag; }
Use @Builder.Dag, @Builder.Task, and @Builder.XCom annotations on a class and its methods. The SDK's annotation processor generates a <ClassName>Builder class with Task implementations at build time. XCom inputs declared via @Builder.XCom are fetched automatically; non-void return values are pushed as XCom.
@Builder.Dag(id = "java_annotation_example") public class AnnotationExample { @Builder.Task(id = "extract") public long extractValue(Client client) throws InterruptedException { var connection = client.getConnection("test_http"); Thread.sleep(6000); return new Date().getTime(); // automatically pushed as XCom } @Builder.Task(id = "transform") public long transformValue(Client client, @Builder.XCom(task = "extract") long extracted) { // `extracted` is pulled from the "extract" task XCom automatically return new Date().getTime(); } @Builder.Task public void load(@Builder.XCom(task = "transform") long transformed) { throw new RuntimeException("I failed"); } }
The generated AnnotationExampleBuilder.build() returns a fully configured Dag. The annotation-based interface is generally preferred for new code because it eliminates boilerplate and makes XCom data-flow explicit in method signatures.
Both approaches register tasks with BundleBuilder.getDags() and are served by the same Server entry point:
public class ExampleBundleBuilder implements BundleBuilder { @Override public Iterable<Dag> getDags() { return List.of(InterfaceExampleBuilder.build(), AnnotationExampleBuilder.build()); } public static void main(String[] args) { var bundle = new ExampleBundleBuilder().build(); Server.create(args).serve(bundle); } }
@task.stubDAG authors declare a non-Python task in a Python DAG file using @task.stub and specify a queue. Python and Java tasks coexist in the same pipeline; the DAG remains defined in Python:
@task() def python_task_1(): return "value_from_python_task_1" @task.stub(queue="java") def extract(): ... @task.stub(queue="java") def transform(): ... @task() def python_task_2(transformed): print(transformed) @dag(dag_id="java_interface_example") def simple_dag(): python_task_1() >> extract() >> transform() >> python_task_2()
The @task.stub declarations carry no Python implementation — execution is delegated to the coordinator identified by the task's queue.
Client and ContextThe Java task interface is void execute(Context context, Client client). Two design choices warrant explanation.
Why both Context and Client? The Java SDK exposes two objects, mirroring the Go SDK:
| Object | Holds | Lifecycle |
|---|---|---|
Context | Static run-time data (ds, ti, logical date, run-id, etc.) | Populated once from StartupDetails, read-only during execution |
Client | Active accessors that perform Execution API calls (connections, variables, XCom) | Each method call is a synchronous request/response over the comm channel |
In Python, magic objects on the context (e.g., outlet_events) can perform Execution API calls transparently because of the language's flexibility. Java is more rigid; making Context itself perform background API calls would require significantly more wiring without much user-visible benefit. Splitting the two surfaces makes the API call boundary explicit at the type level.
Why is execute void? Returning a value from execute would imply an automatic XCom push. Java‘s static type system does not have a clean equivalent of Python’s “return any object, get a default-keyed XCom” pattern, and explicit client.setXCom(...) calls keep the wire-level behavior obvious. (The annotation-based interface infers XCom pushes from non-void return types, providing the same convenience without losing type clarity.)
BaseCoordinator, defined in task-sdk/src/airflow/sdk/execution_time/coordinator.py, exposes a single execute_task method. Subclasses implement this to start the language-specific subprocess and return when it finishes (with exit code and final task state).
The Task SDK provides two coordinator implementations alongside BaseCoordinator:
_PythonCoordinator (built-in, not user-configurable) — implements execute_task by calling ActivitySubprocess.start(), which creates UNIX domain socketpairs and forks a child Python process. The child inherits the request socket on fd 0 and uses the existing task-runner main function. This is the path taken for all Python tasks today.
JavaCoordinator (in airflow.sdk.coordinators.java) — implements execute_task by creating two TCP server sockets on 127.0.0.1, spawning the JVM bundle process via subprocess.Popen, and waiting for the Java process to connect back to those servers. It uses _JavaActivitySubprocess, a subclass of ActivitySubprocess, so the request handling, heartbeating, and state management logic is fully shared with the Python path.
The key design benefit: ActivitySubprocess owns the supervisor-side event loop (heartbeating, API request proxying, state management). Both _PythonCoordinator and JavaCoordinator create a subprocess and hand it an ActivitySubprocess instance; only the subprocess start-up and socket establishment differ. Adding a third language requires implementing execute_task in a new BaseCoordinator subclass, with no changes to Airflow Core.
The client parameter passed to execute_task is the already-authenticated Execution API client. It is passed through to ActivitySubprocess, which uses it to forward the subprocess's API requests (getVariable, getConnection, setXCom, etc.) to the API server.
Python tasks and Java tasks use different channels between the supervisor and the task subprocess:
Python tasks — the supervisor creates UNIX domain socketpairs before forking. The child process inherits the request socket on fd 0 (and stdout/stderr on fd 1/2 via separate socketpairs). No network stack is involved.
Java tasks — the coordinator creates two TCP server sockets on 127.0.0.1 (one for the msgpack comm channel, one for structured logs), then spawns the JVM process via subprocess.Popen. The Java process connects back to those servers. From that point on, the supervisor drives the same msgpack-framed request/response exchange as with Python tasks, over TCP instead of UNIX sockets.
The Java SDK process is agnostic to transport — it sees a TCP socket carrying msgpack frames and behaves identically regardless of whether the other end is a Python supervisor or any other implementation of the same protocol.
When a task is dispatched, CoordinatorManager (in the same module as BaseCoordinator) resolves the task's queue to a registered coordinator instance and calls execute_task.
The Java coordinator ships as part of the Task SDK and is importable as airflow.sdk.coordinators.java.JavaCoordinator. The airflow.sdk.coordinators namespace package is structured to allow future separation into standalone distributions without changing import paths. For packaging and registration details, see ADR-0005.
Airflow Backend Language Runtime Subprocess (Java in this example)
─────────────── ──────────────────────────────────────────────────
┌──────────────────────────────┐
│ DAG File (Python) │
│ │
│ @task.stub(queue="java") │
│ def my_java_task(): │
│ ... │
└──────────────┬───────────────┘
│ (standard Python parsing)
┌──────────────▼───────────────┐
│ Metadata DB │
│ │
│ task_instance.queue = "java"│
└──────────────┬───────────────┘
│
┌──────────────▼───────────────┐
│ Scheduler │
│ │
│ Reads queue from TI │
│ ──► ExecuteTask workload │
│ (includes queue) │
└──────────────┬───────────────┘
│
┌──────────────▼───────────────┐ ┌──────────────────────────────┐
│ Execution API │ │ Runtime Subprocess (Java) │
│ │ │ │
│ TI.queue ──► Startup │ │ execute_task() starts JVM │
│ Details │ │ process, accepts TCP conn │
└──────────────┬───────────────┘ │ │
│ └──────────────▲───────────────┘
┌──────────────▼───────────────┐ │ TCP
│ Supervisor │ │
│ │ │
│ CoordinatorManager │ │
│ resolves queue via │ │
│ [sdk] queue_to_coordinator ┼───────────────────────────────────┘
│ → JavaCoordinator │
└──────────────────────────────┘
JavaCoordinator (in task-sdk/src/airflow/sdk/coordinators/java/coordinator.py) accepts three configuration parameters via kwargs:
| Parameter | Default | Description |
|---|---|---|
java_executable | "java" | Path to the java binary |
jvm_args | [] | Extra JVM arguments (e.g. ["-Xmx1024m"]) |
jars_root | [] | Directories scanned for the bundle JAR (Main-Class manifest entry is the entry point) |
1. Decorator — DAG Author Interface
DAG authors declare a non-Python task using @task.stub and specify a queue:
@task.stub(queue="java") def my_java_task(): ...
2. Execution API — Task Queues Routed to the Worker
[sdk] coordinators is a JSON object keyed by coordinator name. Each entry supplies a classpath (resolved via import_string) and free-form kwargs passed to the class constructor. [sdk] queue_to_coordinator maps queue names to those keys:
[sdk] coordinators = { "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": {"java_executable": "java", "jars_root": ["/opt/airflow/jars"]} } } queue_to_coordinator = {"java": "jdk-17"}
Tasks on the java queue are routed to the entry named jdk-17. Multiple entries with the same classpath (e.g. jdk-11 and jdk-17) are independent instances with different kwargs; there is no subclassing needed for per-runtime variants.
For the full configuration schema and multi-JDK examples, see ADR-0005.
The user-facing API surface (Task, Client, Context, Dag, BundleBuilder) is published as Java types and is the contract bundle authors program against. The SDK implementation — CoordinatorComm, Server, Task.kt, Frame.kt — is written in Kotlin.
Kotlin compiles to the same JVM bytecode as Java and is fully interoperable, so this choice is invisible to bundle authors at runtime. The practical reasons for using Kotlin internally:
Client.getVariable() and friends.Because the user-facing API is Java, “Java SDK” remains the accurate name from a DAG-author perspective. A future rename to “JVM SDK” has been floated but is not adopted here; it can be revisited if/when Scala or other JVM-language bindings are proposed.
| Component | New Interface | Change Type |
|---|---|---|
BaseCoordinator | Abstract base with single execute_task hook, defined in Task SDK | New class |
airflow.sdk.coordinators | Namespace package for language coordinator modules | New namespace |
@task.stub decorator | queue: str | None parameter | Additive |
[sdk] coordinators | Airflow configuration: JSON object of named coordinator entries | New option |
[sdk] queue_to_coordinator | Airflow configuration mapping queue name → coordinator entry key | New option |
CoordinatorManager.for_queue | Resolves queue → coordinator, falls back to _PythonCoordinator | New code path |
BaseCoordinator subclass and a corresponding entry in [sdk] coordinators — no changes to Airflow Core and no provider plumbing.ActivitySubprocess infrastructure (heartbeating, state management, API request proxying) is reused for all language runtimes.