ADR-0002: Workload Execution — Language-Specific Task Execution

Status

Accepted

Context

Airflow's standard task runner executes Python callables. To support tasks written in other languages, the pipeline needs an extension point where a language-specific coordinator can intercept the execution, delegate to an external runtime process, and bridge the Task SDK protocol so the external process can access Airflow services (connections, variables, XCom) during execution.

This ADR details the task execution side of the coordinator architecture described in ADR-0001. It starts with the generic model — the abstract contracts and expected behavior that any language must implement — then walks through Java as a concrete example.

The Python-side BaseCoordinator interface, CoordinatorManager, and the supervisor changes needed to support them are implemented in the Task SDK alongside the Java coordinator.

Decision

Extension Point: BaseCoordinator

BaseCoordinator (in task-sdk/src/airflow/sdk/execution_time/coordinator.py) exposes a single execute_task method. Subclasses implement this to start the language-specific subprocess and block until it completes, returning an ExecutionResult (exit code + final task state string).

There is no lower-level task_execution_cmd hook; each coordinator implementation is free to start its subprocess however it likes. For details on the built-in Python coordinator and how the Java coordinator reuses ActivitySubprocess infrastructure, see ADR-0001 — Coordinator Interface and Code Reuse.

Coordinators contribute to the airflow.sdk.coordinators namespace package and are activated through [sdk] coordinators in airflow.cfg — there is no provider.yaml involvement.

Registration and Discovery

Coordinators are registered in [sdk] coordinators and routed via [sdk] queue_to_coordinator in airflow.cfg. See ADR-0001 — Java Coordinator Configuration for a configuration example, and ADR-0005 for the full configuration schema and rationale.

CoordinatorManager.for_queue(ti.queue) resolves the queue to a coordinator instance (or falls back to _PythonCoordinator) and returns it to the supervisor, which then calls coordinator.execute_task(...). See ADR-0001 — The Coordinator Layer for how CoordinatorManager loads and caches coordinator instances.

Expected E2E Flow

Airflow Executor (dispatches task)
  │
  ▼
supervise_task()                      ← supervisor process entry point
  │
  ├─ coordinator = CoordinatorManager.for_queue(ti.queue)
  │   └─ returns JavaCoordinator (or _PythonCoordinator as fallback)
  │
  ▼
coordinator.execute_task(what, dag_rel_path, bundle_info, client, ...)
  │
  │ [Python path: _PythonCoordinator]
  ├─ ActivitySubprocess.start()
  │   ├─ create UNIX domain socketpairs (requests on fd 0, stdout/stderr on fd 1/2)
  │   ├─ fork child Python process
  │   ├─ child runs task_runner main function
  │   └─ supervisor drives event loop (heartbeats, API proxying)
  │
  │ [Java path: JavaCoordinator]
  └─ _JavaActivitySubprocess.start()
      ├─ create TCP servers on 127.0.0.1:random (comm + logs)
      ├─ spawn Java bundle process via subprocess.Popen
      ├─ accept TCP connections from Java process
      ├─ send StartupDetails to Java process over comm socket
      └─ supervisor drives the same event loop as the Python path

For the transport details (UNIX socketpairs for Python, TCP loopback for Java) and why they differ, see ADR-0001 — Supervisor–Subprocess Communication.

Expected Message Sequence

Task execution is a multi-round conversation. The supervisor and the language runtime exchange msgpack-framed messages directly over their shared channel (a UNIX socket for Python, a TCP socket for Java):

Airflow Supervisor                              Language Runtime
      │                                                │
      ├── StartupDetails ────────────────────────────►│
      │                                                │
      │                                                ├── Look up task from bundle
      │                                                │
      │                          ┌────────────────────┤
      │◄── GetConnection(conn_id)┤  Task code runs    │
      ├── ConnectionResult ─────►│  and may request:  │
      │◄── GetVariable(key) ─────┤                    │
      ├── VariableResult ───────►│                    │
      │◄── GetXCom(key, ...) ────┤                    │
      ├── XComResult ───────────►│                    │
      │◄── SetXCom(key, value..) ┤                    │
      ├── (empty response) ─────►│                    │
      │                          └────────────────────┤
      │                                                │
      │◄── SucceedTask / TaskState ───────────────────┤
      │   (terminal — no response)                    │
      │                                                └── exit(0)

Task SDK Protocol Messages

The language runtime exchanges these message types with the Airflow supervisor:

Runtime → Supervisor (requests):

MessageFieldsPurpose
GetConnectionconn_idFetch an Airflow connection by ID
GetVariablekeyFetch an Airflow variable by key
GetXComkey, dag_id, task_id, run_id, map_index?, include_prior_dates?Fetch an XCom value
SetXComkey, value, dag_id, task_id, run_id, map_index, mapped_length?Store an XCom value
SucceedTaskend_date, task_outlets?, outlet_events?Terminal: task succeeded
TaskStatestate ("failed", "removed", "skipped"), end_dateTerminal: task ended non-successfully

Supervisor → Runtime (responses):

MessageFieldsIn response to
ConnectionResultconn_id, conn_type, host, schema, login, password, port, extraGetConnection
VariableResultkey, valueGetVariable
XComResultkey, valueGetXCom
(empty)SetXCom
ErrorResponseerror, detailAny request that failed server-side

Framing: Every message is a length-prefixed msgpack frame. Requests are [id, body] (2-element array); responses are [id, body, error] (3-element array). The id field correlates request/response pairs.

Request/Response Semantics

The task execution follows a synchronous request/response pattern from the runtime's perspective:

  1. The runtime sends a request frame (e.g., GetVariable) with an incrementing id
  2. The supervisor reads the frame, fulfills the request (e.g., calls the Execution API), and sends back a response with the same id
  3. The runtime blocks until it receives the response
  4. This repeats for each Airflow service call the task code makes
  5. When the task finishes, the runtime sends a terminal message (SucceedTask or TaskState) — no response is expected, and the process exits

IPC Forward-Compatibility Contract

The supervisor-to-runtime IPC schema (the messages enumerated above plus StartupDetails) is shared between Airflow Core (Python) and every language SDK. A formal AIP for this protocol is expected as follow-up work; until then, this section pins down the rules that the Java SDK assumes and that any future SDK (Go, Rust, …) must follow.

Codec rule (load-bearing). Every SDK MUST configure its decoder to ignore unknown fields:

  • Python side: msgspec / Pydantic models are forward-compatible by default.
  • Java side: TaskSdkFrames.kt configures the Jackson ObjectMapper with FAIL_ON_UNKNOWN_PROPERTIES = false. A short comment at that call site documents that this is contract, not preference — flipping it back to the Jackson default would break forward compatibility with Core.
  • Any new SDK: pick a codec configuration that mirrors this (silent drop of unknown fields).

This rule is what makes additive Core changes safe to ship without bumping a version on every SDK. The analogous trap — generated clients that emit their own allowlist check before the configured mapper sees the bytes — has bitten downstream Java consumers in unrelated systems; flagging the contract here makes it visible to future SDK authors.

Change classification.

Change to a messageStatusRequired action
Add a new optional fieldNon-breaking. Decoders ignore it; old SDKs unaffected.None. Just ship it.
Add a new required fieldBreaking.Deprecation cycle: ship as optional first, populate from Core, wait for SDKs to consume it, then tighten.
Rename a fieldBreaking.Deprecation cycle: emit both names from Core during transition.
Change a field's typeBreaking.Deprecation cycle, typically via a new field name + parallel emission.
Remove a required fieldBreaking. Especially dangerous in Java: lateinit var properties on StartupDetails deserialize silently and only throw UninitializedPropertyAccessException on first access, so the failure surfaces inside user task code rather than at the protocol boundary.Deprecation cycle. Prefer making the field optional first, then remove after a release in which all SDKs have absorbed the change.

Recommended testing. A small contract test on the SDK side should feed the decoder synthetic frames that exercise the rules above — an unknown field, a missing optional field, a null in an optional position — so that a future codec-config regression is caught before it reaches users. Such IPC-envelope tests are currently in the follow-up bucket.

Runtime Lifecycle and Worker Capability

The language runtime is ephemeral and one-process-per-task:

  • Each task instance launches its own java -classpath <bundle>/* <MainClass> --comm=… --logs=… (or the equivalent for another language). The lifetime of that process is the lifetime of the task. There is no pooling or warm-pool reuse.
  • Parallelism on a single worker therefore equals the number of concurrently running task processes. Five concurrent Java tasks on one worker means five JVMs.
  • DAG parsing has the same shape: each DagFileProcessorProcess child handles one parse request and exits. The language runtime spawned underneath it inherits that ephemerality.

Worker capability is opt-in. A worker can run a non-Python task only if the Task SDK (which includes the language coordinator module) is installed, the matching coordinator instance is declared in [sdk] coordinators, and the language toolchain (e.g., a JRE) is on the host. There is no requirement that every worker support every language. Routing relies on:

LayerMechanism
Author intent@task.stub declares queue="java" (or any custom queue)
Worker selectionThe executor (Celery, Kubernetes, etc.) routes the task to a worker that consumes that queue, exactly as it does for Python tasks today
Runtime selectionInside the task runner, [sdk] queue_to_coordinator maps the queue name to a coordinator instance name; that name is resolved against [sdk] coordinators to obtain the configured class and its kwargs; CoordinatorManager.for_queue instantiates the coordinator and execute_task is called

The deployment model is the same one that already applies to Python providers: install what your DAGs need, on the hosts they run on. Multi-language workers are possible (install both providers and both toolchains) but not required.

JAR / artifact version compatibility. The Java SDK embeds its version in the bundle JAR via the Airflow-Java-SDK-Version manifest attribute. Validating that a bundle's SDK version matches the installed JavaCoordinator version at execution time is planned but not yet wired in; this is a follow-up to add before promoting the SDK out of preview.

StartupDetails

The first message the runtime receives is StartupDetails, which provides full context for the task:

FieldTypeDescription
tiTaskInstanceid, task_id, dag_id, run_id, try_number, dag_version_id, map_index, context_carrier
dag_rel_pathstringRelative path to the DAG file / bundle
bundle_infoBundleInfoname, version
start_datedatetimeWhen this task attempt started
ti_contextTIRunContextDAG run context (logical date, data interval, etc.)
sentry_integrationstringSentry DSN for error reporting (optional)

What a Language SDK Must Implement

For task execution, a new language SDK needs:

  1. A BaseCoordinator subclass with:

    • An __init__ that accepts the kwargs declared in [sdk] coordinators (e.g., interpreter path, language-specific runtime flags)
    • execute_task(...) — starts the language-specific subprocess, drives the ActivitySubprocess event loop, and returns when the task finishes
  2. A runtime process that:

    • Accepts --comm=host:port and --logs=host:port CLI arguments
    • Connects to both TCP addresses
    • Reads a StartupDetails msgpack frame from the comm channel
    • Looks up the task to execute from its bundle using ti.dag_id and ti.task_id
    • Executes the task, making GetConnection/GetVariable/GetXCom/SetXCom requests as needed
    • Sends SucceedTask on success or TaskState("failed") on failure
    • Exits
  3. A task interface that user code implements (analogous to Python's @task decorator or BaseOperator)

  4. A client API that wraps the socket protocol behind a simple interface (get_connection, get_variable, get_xcom, set_xcom) so task authors don't deal with framing

  5. Distribution under airflow.sdk.coordinators.<lang> — currently shipped as part of the Task SDK; a standalone distribution is possible in the future without changing the import path

Java as a Concrete Example

JavaCoordinator (Python side):

See ADR-0001 — Coordinator Interface and Code Reuse for how JavaCoordinator implements execute_task, why it uses _JavaActivitySubprocess, and how the Python and Java paths share ActivitySubprocess infrastructure. For configuration parameters and an airflow.cfg example, see ADR-0001 — Java Coordinator Configuration.

Java SDK Task Interface:

User task code implements a single-method interface:

// sdk: org.apache.airflow.sdk.Task
public interface Task {
    void execute(Client client) throws Exception;
}

The Client provides access to Airflow services:

// sdk: org.apache.airflow.sdk.Client
public class Client {
    // Access task metadata
    public StartupDetails getDetails();

    // Airflow services
    public Connection getConnection(String id);
    public Object getVariable(String key);
    public Object getXCom(String key, String dagId, String taskId, String runId, ...);
    public void setXCom(String key, Object value);  // defaults: key="return_value", dagId/taskId/runId from current task
}

Java SDK Task Execution Flow:

When the bundle process receives StartupDetails:

CoordinatorComm.handleIncoming(frame)
  │
  ├── frame.body is StartupDetails
  │     ti: TaskInstance (id, dagId, taskId, runId, tryNumber, ...)
  │     dagRelPath, bundleInfo, startDate, tiContext
  │
  ▼
TaskRunner.run(bundle, request, comm)
  │
  ├── Create Client(request, CoordinatorClient(comm))
  │     CoordinatorClient wraps the comm channel behind the Client interface
  │
  ├── Look up task class:
  │     bundle.dags[request.ti.dagId]?.tasks[request.ti.taskId]
  │     └── if not found → return TaskState("removed")
  │
  ├── Instantiate task:
  │     task.getDeclaredConstructor().newInstance()
  │
  ├── Execute:
  │     try {
  │       instance.execute(client)  ← USER TASK CODE RUNS HERE
  │       return SucceedTask()
  │     } catch (Exception e) {
  │       return TaskState("failed")
  │     }
  │
  ▼
sendMessage(frame.id, result)  ← sends SucceedTask or TaskState back
shutDownRequested = true       ← one-shot, process will exit

Java SDK Airflow Service Access:

When user task code calls client.getVariable("my_key"), the call chain is:

client.getVariable("my_key")                          // Client.kt (public SDK)
  │
  └── impl.getVariable("my_key")                      // CoordinatorClient (execution)
        │
        └── runBlocking {                              // blocks the calling thread
              comm.communicate<VariableResponse>(       // CoordinatorComm
                GetVariable(key = "my_key")
              )
            }
              │
              ├── sendMessage(nextId++, GetVariable)   // encode + write to comm socket
              │     ├── encode: [id, {"type": "GetVariable", "key": "my_key"}]
              │     └── write: [4-byte len][msgpack]
              │
              ├── processOnce(::handle)                // block until response arrives
              │     ├── read 4-byte length prefix
              │     ├── read payload
              │     └── decode: [id, {"type": "VariableResult", ...}, null]
              │
              └── return response.value                // unwrap VariableResponse

This is fully synchronous from the task code's perspective — getVariable() blocks until the supervisor responds.

Java SDK Example Task Implementation:

public static class Extract implements Task {
    public void execute(Client client) throws Exception {
        // Read XCom from a Python task in the same DAG
        var pythonXcom = client.getXCom("python_task_1");

        // Access Airflow connections
        var connection = client.getConnection("test_http");

        // Do work...
        Thread.sleep(6000);

        // Push XCom for downstream tasks (Java or Python)
        client.setXCom(new Date().getTime());
    }
}

public static class Transform implements Task {
    public void execute(Client client) {
        // Read XCom from upstream Java task
        var extractXcom = client.getXCom("extract");

        // Access Airflow variables
        var variable = client.getVariable("my_variable");

        // Push XCom (readable by downstream Python tasks)
        client.setXCom(new Date().getTime());
    }
}

public static class Load implements Task {
    public void execute(Client client) {
        var xcom = client.getXCom("transform");
        throw new RuntimeException("I failed");
        // Exception → TaskRunner catches → sends TaskState("failed")
    }
}

Java SDK Complete Bundle Entry Point:

See ADR-0001 — Writing a Non-Python Task for the full BundleBuilder / Server.create(args).serve(bundle) pattern. From the task execution perspective, main() is the JVM entry point the coordinator launches; StartupDetails is the first message received, which triggers runTask(), and the process exits after the terminal SucceedTask/TaskState response.

Consequences

  • Task execution for any language reuses the same coordinator pattern, keeping the extension surface small.
  • The multi-round protocol (GetConnection, GetVariable, etc.) means the language runtime has full access to Airflow services without reimplementing them — they stay in Python.
  • The synchronous request/response model is simple for language SDK authors but adds a round-trip per service call.
  • Task authors interact with a simple Client interface, completely abstracted from the underlying socket protocol.