layout: global displayTitle: Structured Streaming Programming Guide title: Structured Streaming Programming Guide license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Real-time Mode is a new streaming execution mode introduced in Spark 4.1.0 that targets ultra-low end-to-end latency with the exact same API and processing guarantees / semantics as the current structured streaming engine. It is intended for operational workloads that must react to data the moment it arrives, such as fraud detection, real-time alerting, and live personalization.
In this release, Real-time Mode in Apache Spark supports stateless queries only -- projections, filters and other map-like operations, unions, and stream-static joins. Stateful operations such as streaming aggregations, deduplication, stream-stream joins, and transformWithState are not yet supported, but support for them is planned starting in Spark 4.3. See Supported Queries for the full list.
The most important thing to know: the duration you pass to the trigger (default 5 minutes) is a checkpoint interval, not a latency target. Records are processed and emitted continuously rather than at batch boundaries, so the trigger duration does not set latency the way a micro-batch interval does. See Batch Duration Is a Checkpoint Interval.
You enable Real-time Mode by setting a Real-time trigger on the streaming write; the rest of your query is unchanged. See Enabling Real-time Mode.
By default, Structured Streaming runs a query as a series of small batch jobs -- the micro-batch model. For each micro-batch, the driver plans the batch and launches a fresh set of short-lived tasks. Those tasks read and process a bounded slice of the input, and the driver commits progress before planning the next batch. The fixed per-batch planning and task-scheduling overhead places a floor on end-to-end latency.
Real-time Mode removes this per-batch overhead by launching long-running tasks -- one per input partition. These tasks stay alive for the duration of a (long) batch and process records continuously as they arrive. Because tasks are scheduled once per batch rather than once per slice of data, records flow through the operator pipeline (source -> transformations -> sink) without waiting for a batch boundary. End-to-end latency drops from the ~100 ms micro-batch floor to roughly the time needed to process and ship one record (often a few milliseconds).
Since records never wait for a batch boundary, the batch duration mainly controls how often the query checkpoints progress -- as the next section explains.
In Real-time Mode, the batch duration is a checkpoint interval, not a latency interval. With the default 5-minute duration, the query still emits records within milliseconds; the 5 minutes only controls how often it commits progress and starts the next long-running batch. This is the opposite of the micro-batch engine, where a longer batch interval directly increases latency.
Do not confuse the 5-minute default trigger duration with the 5-second minimum allowed duration described under Requirements: the former is the checkpoint cadence used when you do not specify a duration, while the latter is the smallest duration you are allowed to set.
Choosing the batch duration is a trade-off:
The duration is set on the Real-time trigger, as shown under Enabling Real-time Mode.
The table below summarizes how Real-time Mode relates to the default micro-batch engine and to the experimental Continuous Processing mode. See How Real-time Mode Works for the mechanism and Supported Queries for the full list of supported operations.
| Mode | Latency | Processing Guarantees | Supported operations | When to use |
|---|---|---|---|---|
| Micro-batch (default) | ~100 ms | Exactly-once | All streaming operations, including stateful | Stateful or higher-throughput workloads, or queries Real-time Mode does not yet support |
| Real-time Mode | millisecond-scale | Exactly-once | Stateless today (map-like operations, unions, and stream-static joins); designed to support all query shapes, including stateful | Low-latency workloads |
| Continuous Processing (experimental) | ~1 ms | At-least-once | Map-like only (projections and selections); no stateful operations | Legacy; use Real-time Mode instead |
The Processing Guarantees column refers to processing semantics, defined under Fault Tolerance; end-to-end delivery additionally depends on the sink and is independent of the execution mode.
Real-time Mode and Continuous Processing both target millisecond-scale latency, but they differ substantially:
For new low-latency workloads, prefer Real-time Mode over Continuous Processing.
To run a supported query in Real-time Mode, set a Real-time trigger on the streaming write. Everything else in the query stays the same. For example, the following query reads from a Kafka topic, applies a stateless transformation, and writes the result to another Kafka topic. Records flow through with low latency even though the trigger is 5 minutes.
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) // enable Real-time Mode .start() {% endhighlight %}
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) // enable Real-time Mode .start(); {% endhighlight %}
Trigger.RealTime(...), imported from org.apache.spark.sql.streaming.Trigger. Several forms are available:Trigger.RealTime() -- uses the default batch duration of 5 minutes.Trigger.RealTime("5 minutes") -- a duration string.Trigger.RealTime(300000) -- the batch duration in milliseconds, as a long.Trigger.RealTime(5, TimeUnit.MINUTES) -- a value together with a java.util.concurrent.TimeUnit.Trigger.RealTime(Duration("10 seconds")) -- a Scala scala.concurrent.duration.Duration.realTime keyword argument of trigger(), for example .trigger(realTime="5 minutes"). The duration is required in Python.A query must satisfy all of the following before it can start in Real-time Mode; each is checked when the query starts:
update. Any other output mode fails to start with STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED.checkpointLocation is required, as with any other Structured Streaming query.spark.sql.streaming.realTimeMode.minBatchDuration (5000 ms, i.e. 5 seconds, by default); a shorter interval fails to start with INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL. The duration string must parse to a positive interval, and month-based intervals (for example, "1 month") are not accepted. (This 5-second minimum is distinct from the 5-minute default; see Batch Duration Is a Checkpoint Interval.)Real-time Mode supports stateless, map-like queries only.
The following operations, sources, and sinks are supported as of Spark 4.1.0:
Operations: stateless, map-like operations are supported:
select, selectExpr, withColumn, drop, and the typed map / flatMap Dataset operations.where / filter.from_json / to_json and scalar user-defined functions (UDFs).explode.union of two or more distinct streaming sources. Referencing the same source DataFrame more than once is not supported and fails with STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED; create a separate DataFrame for each source instead.broadcast(...) hint), because Real-time Mode does not support shuffles.withWatermark (event-time watermark declaration) is allowed, although it has no effect because stateful operators are not supported. This lets queries that already declare a watermark run in Real-time Mode without modification.Sources: the source must support Real-time Mode. In Apache Spark, the Kafka source supports Real-time Mode. An unsupported source fails with STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED. (The built-in rate source is not supported as a Real-time source.)
Sinks:
ForeachWriter), for writing to arbitrary external systems one record at a time. See Using Foreach. Note that foreachBatch is not supported, because it processes each batch as a whole rather than one record at a time.Other sinks fail with STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED.
The operators and sinks used by a Real-time query are checked against an allowlist before the query starts; anything outside the allowlist fails with STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST.
Stateful operations of any kind are not supported in this release. This includes streaming aggregations, dropDuplicates / dropDuplicatesWithinWatermark, stream-stream joins, repartition and other operations that introduce a shuffle, and stateful operators such as flatMapGroupsWithState and transformWithState. Support for stateful operations is planned starting in Spark 4.3. Asynchronous progress tracking is also not supported; enabling it fails with STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED.
Real-time Mode provides the same exactly-once processing guarantees as the default micro-batch engine. Two distinct guarantees are worth separating:
Real-time Mode is exactly-once with respect to processing. End-to-end delivery depends on the sink: a sink that performs idempotent or transactional writes can deliver exactly-once, while other sinks deliver at-least-once (duplicates are possible after a failure). The built-in Kafka sink provides at-least-once delivery, with or without Real-time Mode. Real-time Mode does not yet ship an exactly-once sink, though one can be implemented.
Internally, offsets are committed at the end of each batch, after the corresponding records have already been written to the sink. If a query fails partway through a batch, it resumes from the last committed offsets on restart and may re-write records emitted before the failure. Design sinks to tolerate duplicates -- for example, with idempotent writes -- where exactly-once output matters.
The following examples read from Kafka and assume a running Kafka cluster. Each example shows the same query in Python, Scala, and Java.
Enrich a stream by joining it with a static reference dataset. The static side is wrapped in broadcast(...) so the join is executed as a broadcast (map-side) join, which avoids a shuffle.
reference = spark.read.format(“parquet”).load(“/path/to/reference”)
spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“subscribe”, “input-topic”)
.load()
.selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”)
.join(broadcast(reference), “joinKey”)
.writeStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“topic”, “output-topic”)
.option(“checkpointLocation”, “/path/to/checkpoint”)
.outputMode(“update”)
.trigger(realTime=“5 minutes”)
.start() {% endhighlight %}
// Static reference data, read once as a batch DataFrame. val reference = spark.read.format(“parquet”).load(“/path/to/reference”)
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”) .join(broadcast(reference), “joinKey”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) .start() {% endhighlight %}
// Static reference data, read once as a batch DataFrame. Dataset reference = spark.read().format(“parquet”).load(“/path/to/reference”);
spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”) .join(broadcast(reference), “joinKey”) .writeStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) .start(); {% endhighlight %}
The console sink prints output to the driver‘s standard output and is handy while developing a query. Note that the console sink buffers each batch’s rows and prints them when the batch commits, so its output appears once per batch -- here, every 30 seconds -- rather than continuously. This makes it useful for inspecting results, but it does not reflect Real-time Mode's true per-record latency; to observe that, use a row-by-row sink such as Kafka. A shorter batch duration simply makes the console refresh more often.
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .where(“value IS NOT NULL”) .writeStream .format(“console”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“30 seconds”)) .start() {% endhighlight %}
spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .where(“value IS NOT NULL”) .writeStream() .format(“console”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“30 seconds”)) .start(); {% endhighlight %}
| Configuration | Default | Meaning |
|---|---|---|
spark.sql.streaming.realTimeMode.minBatchDuration | 5000 (ms, 5 seconds) | The minimum batch duration, in milliseconds, allowed for a Real-time trigger. See the batch-duration requirement under Requirements. |
spark.sql.streaming.realTimeMode.allowlistCheck | true | Whether to verify that all operators and sinks used by a Real-time query are in the supported allowlist. Disabling this check (not recommended) lets unsupported operators and sinks run at your own risk. |
minPartitions option is not supported in Real-time Mode.