layout: global displayTitle: Structured Streaming Programming Guide title: Structured Streaming Programming Guide license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

 http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

  • Table of contents {:toc}

Real-time Mode

Real-time Mode is a new streaming execution mode introduced in Spark 4.1.0 that targets ultra-low end-to-end latency with the exact same API and processing guarantees / semantics as the current structured streaming engine. It is intended for operational workloads that must react to data the moment it arrives, such as fraud detection, real-time alerting, and live personalization.

In this release, Real-time Mode in Apache Spark supports stateless queries only -- projections, filters and other map-like operations, unions, and stream-static joins. Stateful operations such as streaming aggregations, deduplication, stream-stream joins, and transformWithState are not yet supported, but support for them is planned starting in Spark 4.3. See Supported Queries for the full list.

The most important thing to know: the duration you pass to the trigger (default 5 minutes) is a checkpoint interval, not a latency target. Records are processed and emitted continuously rather than at batch boundaries, so the trigger duration does not set latency the way a micro-batch interval does. See Batch Duration Is a Checkpoint Interval.

You enable Real-time Mode by setting a Real-time trigger on the streaming write; the rest of your query is unchanged. See Enabling Real-time Mode.

How Real-time Mode Works

By default, Structured Streaming runs a query as a series of small batch jobs -- the micro-batch model. For each micro-batch, the driver plans the batch and launches a fresh set of short-lived tasks. Those tasks read and process a bounded slice of the input, and the driver commits progress before planning the next batch. The fixed per-batch planning and task-scheduling overhead places a floor on end-to-end latency.

Real-time Mode removes this per-batch overhead by launching long-running tasks -- one per input partition. These tasks stay alive for the duration of a (long) batch and process records continuously as they arrive. Because tasks are scheduled once per batch rather than once per slice of data, records flow through the operator pipeline (source -> transformations -> sink) without waiting for a batch boundary. End-to-end latency drops from the ~100 ms micro-batch floor to roughly the time needed to process and ship one record (often a few milliseconds).

Since records never wait for a batch boundary, the batch duration mainly controls how often the query checkpoints progress -- as the next section explains.

Batch Duration Is a Checkpoint Interval

In Real-time Mode, the batch duration is a checkpoint interval, not a latency interval. With the default 5-minute duration, the query still emits records within milliseconds; the 5 minutes only controls how often it commits progress and starts the next long-running batch. This is the opposite of the micro-batch engine, where a longer batch interval directly increases latency.

Do not confuse the 5-minute default trigger duration with the 5-second minimum allowed duration described under Requirements: the former is the checkpoint cadence used when you do not specify a duration, while the latter is the smallest duration you are allowed to set.

Choosing the batch duration is a trade-off:

  • A shorter batch duration checkpoints more often, giving finer-grained recovery (less work to re-process after a failure). However, the query does not process data while it commits progress and starts the next batch, so checkpointing too frequently adds more of these gaps, which can raise tail (p99) latency, in addition to incurring more planning and commit overhead.
  • A longer batch duration checkpoints less often, reducing that overhead and those gaps, at the cost of coarser-grained recovery (more data re-processed after a failure).

The duration is set on the Real-time trigger, as shown under Enabling Real-time Mode.

Comparison with Other Modes

The table below summarizes how Real-time Mode relates to the default micro-batch engine and to the experimental Continuous Processing mode. See How Real-time Mode Works for the mechanism and Supported Queries for the full list of supported operations.

ModeLatencyProcessing GuaranteesSupported operationsWhen to use
Micro-batch (default)~100 msExactly-onceAll streaming operations, including statefulStateful or higher-throughput workloads, or queries Real-time Mode does not yet support
Real-time Modemillisecond-scaleExactly-onceStateless today (map-like operations, unions, and stream-static joins); designed to support all query shapes, including statefulLow-latency workloads
Continuous Processing (experimental)~1 msAt-least-onceMap-like only (projections and selections); no stateful operationsLegacy; use Real-time Mode instead

The Processing Guarantees column refers to processing semantics, defined under Fault Tolerance; end-to-end delivery additionally depends on the sink and is independent of the execution mode.

Real-time Mode and Continuous Processing both target millisecond-scale latency, but they differ substantially:

  • Continuous Processing (introduced in Spark 2.3) is, and remains, experimental. It supports only map-like operations -- projections and selections -- with no stateful operations such as aggregations or joins, and it provides at-least-once guarantees. Because it is stateless, the exactly-once processing guarantee discussed under Fault Tolerance does not apply to it. These constraints have limited its adoption.
  • Real-time Mode is designed to support all query shapes, including stateful operations, while reusing Spark's mature components such as state management, the Catalyst optimizer, and the existing SQL operators. It provides exactly-once processing semantics. It currently supports stateless queries, with stateful support planned starting in Spark 4.3.

For new low-latency workloads, prefer Real-time Mode over Continuous Processing.

Enabling Real-time Mode

To run a supported query in Real-time Mode, set a Real-time trigger on the streaming write. Everything else in the query stays the same. For example, the following query reads from a Kafka topic, applies a stateless transformation, and writes the result to another Kafka topic. Records flow through with low latency even though the trigger is 5 minutes.

spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) // enable Real-time Mode .start() {% endhighlight %}

spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) // enable Real-time Mode .start(); {% endhighlight %}

Trigger API

  • Scala and Java: the trigger is Trigger.RealTime(...), imported from org.apache.spark.sql.streaming.Trigger. Several forms are available:
    • Trigger.RealTime() -- uses the default batch duration of 5 minutes.
    • Trigger.RealTime("5 minutes") -- a duration string.
    • Trigger.RealTime(300000) -- the batch duration in milliseconds, as a long.
    • Trigger.RealTime(5, TimeUnit.MINUTES) -- a value together with a java.util.concurrent.TimeUnit.
    • Trigger.RealTime(Duration("10 seconds")) -- a Scala scala.concurrent.duration.Duration.
  • Python: pass the batch duration as a string to the realTime keyword argument of trigger(), for example .trigger(realTime="5 minutes"). The duration is required in Python.

Requirements

A query must satisfy all of the following before it can start in Real-time Mode; each is checked when the query starts:

  • The output mode must be update. Any other output mode fails to start with STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED.
  • A checkpointLocation is required, as with any other Structured Streaming query.
  • The batch duration must be at least spark.sql.streaming.realTimeMode.minBatchDuration (5000 ms, i.e. 5 seconds, by default); a shorter interval fails to start with INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL. The duration string must parse to a positive interval, and month-based intervals (for example, "1 month") are not accepted. (This 5-second minimum is distinct from the 5-minute default; see Batch Duration Is a Checkpoint Interval.)

Supported Queries

Real-time Mode supports stateless, map-like queries only.

The following operations, sources, and sinks are supported as of Spark 4.1.0:

  • Operations: stateless, map-like operations are supported:

    • Projections: select, selectExpr, withColumn, drop, and the typed map / flatMap Dataset operations.
    • Selections: where / filter.
    • Expressions that compile to a projection -- including functions such as from_json / to_json and scalar user-defined functions (UDFs).
    • Column generators such as explode.
    • union of two or more distinct streaming sources. Referencing the same source DataFrame more than once is not supported and fails with STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED; create a separate DataFrame for each source instead.
    • Stream-static joins, where a streaming DataFrame is joined with a static DataFrame. The static side must be broadcast (use the broadcast(...) hint), because Real-time Mode does not support shuffles.
    • withWatermark (event-time watermark declaration) is allowed, although it has no effect because stateful operators are not supported. This lets queries that already declare a watermark run in Real-time Mode without modification.
  • Sources: the source must support Real-time Mode. In Apache Spark, the Kafka source supports Real-time Mode. An unsupported source fails with STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED. (The built-in rate source is not supported as a Real-time source.)

  • Sinks:

    • Kafka sink.
    • Foreach sink (via ForeachWriter), for writing to arbitrary external systems one record at a time. See Using Foreach. Note that foreachBatch is not supported, because it processes each batch as a whole rather than one record at a time.
    • Console and memory sinks, which are useful for development and debugging.

    Other sinks fail with STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED.

The operators and sinks used by a Real-time query are checked against an allowlist before the query starts; anything outside the allowlist fails with STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST.

Not supported

Stateful operations of any kind are not supported in this release. This includes streaming aggregations, dropDuplicates / dropDuplicatesWithinWatermark, stream-stream joins, repartition and other operations that introduce a shuffle, and stateful operators such as flatMapGroupsWithState and transformWithState. Support for stateful operations is planned starting in Spark 4.3. Asynchronous progress tracking is also not supported; enabling it fails with STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED.

Fault Tolerance

Real-time Mode provides the same exactly-once processing guarantees as the default micro-batch engine. Two distinct guarantees are worth separating:

  • Exactly-once processing means every input record's effect on the state the engine manages (for example, aggregation counts) is applied effectively once, even across failures and restarts.
  • Delivery semantics describe whether a record may be written to the external system more than once. This is a property of the sink, not the execution mode.

Real-time Mode is exactly-once with respect to processing. End-to-end delivery depends on the sink: a sink that performs idempotent or transactional writes can deliver exactly-once, while other sinks deliver at-least-once (duplicates are possible after a failure). The built-in Kafka sink provides at-least-once delivery, with or without Real-time Mode. Real-time Mode does not yet ship an exactly-once sink, though one can be implemented.

Internally, offsets are committed at the end of each batch, after the corresponding records have already been written to the sink. If a query fails partway through a batch, it resumes from the last committed offsets on restart and may re-write records emitted before the failure. Design sinks to tolerate duplicates -- for example, with idempotent writes -- where exactly-once output matters.

Examples

The following examples read from Kafka and assume a running Kafka cluster. Each example shows the same query in Python, Scala, and Java.

Stream-static join

Enrich a stream by joining it with a static reference dataset. The static side is wrapped in broadcast(...) so the join is executed as a broadcast (map-side) join, which avoids a shuffle.

Static reference data, read once as a batch DataFrame.

reference = spark.read.format(“parquet”).load(“/path/to/reference”)

spark
.readStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“subscribe”, “input-topic”)
.load()
.selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”)
.join(broadcast(reference), “joinKey”)
.writeStream
.format(“kafka”)
.option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”)
.option(“topic”, “output-topic”)
.option(“checkpointLocation”, “/path/to/checkpoint”)
.outputMode(“update”)
.trigger(realTime=“5 minutes”)
.start() {% endhighlight %}

// Static reference data, read once as a batch DataFrame. val reference = spark.read.format(“parquet”).load(“/path/to/reference”)

spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”) .join(broadcast(reference), “joinKey”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) .start() {% endhighlight %}

// Static reference data, read once as a batch DataFrame. Dataset reference = spark.read().format(“parquet”).load(“/path/to/reference”);

spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING) AS joinKey”, “CAST(value AS STRING) AS value”) .join(broadcast(reference), “joinKey”) .writeStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “output-topic”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“5 minutes”)) .start(); {% endhighlight %}

Writing to the console for development

The console sink prints output to the driver‘s standard output and is handy while developing a query. Note that the console sink buffers each batch’s rows and prints them when the batch commits, so its output appears once per batch -- here, every 30 seconds -- rather than continuously. This makes it useful for inspecting results, but it does not reflect Real-time Mode's true per-record latency; to observe that, use a row-by-row sink such as Kafka. A shorter batch duration simply makes the console refresh more often.

spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .where(“value IS NOT NULL”) .writeStream .format(“console”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“30 seconds”)) .start() {% endhighlight %}

spark .readStream() .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “input-topic”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .where(“value IS NOT NULL”) .writeStream() .format(“console”) .option(“checkpointLocation”, “/path/to/checkpoint”) .outputMode(“update”) .trigger(Trigger.RealTime(“30 seconds”)) .start(); {% endhighlight %}

Configuration

ConfigurationDefaultMeaning
spark.sql.streaming.realTimeMode.minBatchDuration5000 (ms, 5 seconds)The minimum batch duration, in milliseconds, allowed for a Real-time trigger. See the batch-duration requirement under Requirements.
spark.sql.streaming.realTimeMode.allowlistChecktrueWhether to verify that all operators and sinks used by a Real-time query are in the supported allowlist. Disabling this check (not recommended) lets unsupported operators and sinks run at your own risk.

Best Practices

  • Real-time Mode launches long-running tasks -- one per input partition -- that continuously read, process, and write data. The number of tasks a query needs depends on how many partitions it reads from its sources in parallel. Before starting a Real-time query, ensure the cluster has enough cores to run all of these tasks simultaneously and continuously. For example, reading from a Kafka topic with 10 partitions requires at least 10 cores for the query to make progress. Real-time Mode uses a fixed 1:1 mapping between Kafka topic partitions and reader tasks; the minPartitions option is not supported in Real-time Mode.
  • Run a single Real-time query per cluster. Because Real-time Mode holds its task slots for the entire batch duration, any other queries sharing the cluster compete for the same slots, which can starve the Real-time query of resources and increase its latency.

Caveats

  • Real-time Mode provides exactly-once processing semantics, but sinks may receive duplicate records after a failure. See Fault Tolerance for how to design sinks for exactly-once writes.
  • Adaptive Query Execution (AQE) is not supported for Real-time Mode queries.