blob: 15731dfb36a7aa013eef17e210156da8804f00fd [file] [view]
---
layout: global
displayTitle: Structured Streaming Programming Guide
title: Structured Streaming Programming Guide
license: |
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
---
* Table of contents
{:toc}
# Real-time Mode
**Real-time Mode** is a new streaming execution mode introduced in Spark 4.1.0 that
targets ultra-low end-to-end latency with the exact same API and processing guarantees / semantics as the current structured streaming engine.
It is intended for operational workloads
that must react to data the moment it arrives, such as fraud detection, real-time alerting, and live
personalization.
In this release, Real-time Mode in Apache Spark supports **stateless queries** only -- projections,
filters and other map-like operations, unions, and stream-static joins. Stateful operations such as
streaming aggregations, deduplication, stream-stream joins, and `transformWithState` are not yet
supported, but support for them is planned starting in Spark 4.3. See
[Supported Queries](#supported-queries) for the full list.
The most important thing to know: **the duration you pass to the trigger (default 5 minutes) is a
checkpoint interval, not a latency target.** Records are processed and emitted continuously rather
than at batch boundaries, so the trigger duration does not set latency the way a micro-batch interval does. See
[Batch Duration Is a Checkpoint Interval](#batch-duration-is-a-checkpoint-interval).
You enable Real-time Mode by setting a Real-time trigger on the streaming write; the rest of your
query is unchanged. See [Enabling Real-time Mode](#enabling-real-time-mode).
## How Real-time Mode Works
By default, Structured Streaming runs a query as a series of small batch jobs -- the *micro-batch*
model. For each micro-batch, the driver plans the batch and launches a fresh set of short-lived
tasks. Those tasks read and process a bounded slice of the input, and the driver commits progress
before planning the next batch. The fixed per-batch planning and task-scheduling overhead places a
floor on end-to-end latency.
Real-time Mode removes this per-batch overhead by launching **long-running tasks** -- one per input
partition. These tasks stay alive for the duration of a (long) batch and process records
continuously as they arrive. Because tasks are scheduled once per batch rather than once per slice
of data, records flow through the operator pipeline (source -> transformations -> sink) without
waiting for a batch boundary. End-to-end latency drops from the ~100 ms micro-batch floor to roughly
the time needed to process and ship one record (often a few milliseconds).
Since records never wait for a batch boundary, the batch duration mainly controls how often the
query checkpoints progress -- as the next section explains.
## Batch Duration Is a Checkpoint Interval
In Real-time Mode, the batch duration is a **checkpoint interval, not a latency interval.** With the
default 5-minute duration, the query still emits records within milliseconds; the 5 minutes only
controls how often it commits progress and starts the next long-running batch. This is the opposite
of the micro-batch engine, where a longer batch interval directly increases latency.
Do not confuse the 5-minute default trigger duration with the 5-second minimum allowed duration
described under [Requirements](#requirements): the former is the checkpoint cadence used when you do
not specify a duration, while the latter is the smallest duration you are allowed to set.
Choosing the batch duration is a trade-off:
- A *shorter* batch duration checkpoints more often, giving finer-grained recovery (less work to
re-process after a failure). However, the query does not process data while it commits progress
and starts the next batch, so checkpointing too frequently adds more of these gaps, which can
raise tail (p99) latency, in addition to incurring more planning and commit overhead.
- A *longer* batch duration checkpoints less often, reducing that overhead and those gaps, at the
cost of coarser-grained recovery (more data re-processed after a failure).
The duration is set on the Real-time trigger, as shown under
[Enabling Real-time Mode](#enabling-real-time-mode).
## Comparison with Other Modes
The table below summarizes how Real-time Mode relates to the default micro-batch engine and to the
experimental [Continuous Processing](./performance-tips.html#continuous-processing) mode. See
[How Real-time Mode Works](#how-real-time-mode-works) for the mechanism and
[Supported Queries](#supported-queries) for the full list of supported operations.
| Mode | Latency | Processing Guarantees | Supported operations | When to use |
|---|---|---|---|---|
| Micro-batch (default) | ~100 ms | Exactly-once | All streaming operations, including stateful | Stateful or higher-throughput workloads, or queries Real-time Mode does not yet support |
| Real-time Mode | millisecond-scale | Exactly-once | Stateless today (map-like operations, unions, and stream-static joins); designed to support all query shapes, including stateful | Low-latency workloads |
| Continuous Processing (experimental) | ~1 ms | At-least-once | Map-like only (projections and selections); no stateful operations | Legacy; use Real-time Mode instead |
The **Processing Guarantees** column refers to processing semantics, defined under
[Fault Tolerance](#fault-tolerance); end-to-end delivery additionally depends on the sink and is
independent of the execution mode.
Real-time Mode and Continuous Processing both target millisecond-scale latency, but they differ
substantially:
- **Continuous Processing** (introduced in Spark 2.3) is, and remains, experimental. It supports
only map-like operations -- projections and selections -- with no stateful operations such as
aggregations or joins, and it provides at-least-once guarantees. Because it is stateless, the
exactly-once *processing* guarantee discussed under [Fault Tolerance](#fault-tolerance) does not
apply to it. These constraints have limited its adoption.
- **Real-time Mode** is designed to support all query shapes, including stateful operations, while
reusing Spark's mature components such as state management, the Catalyst optimizer, and the
existing SQL operators. It provides exactly-once processing semantics. It currently supports
stateless queries, with stateful support planned starting in Spark 4.3.
For new low-latency workloads, prefer Real-time Mode over Continuous Processing.
## Enabling Real-time Mode
To run a supported query in Real-time Mode, set a **Real-time trigger** on the streaming write.
Everything else in the query stays the same. For example, the following query reads from a Kafka
topic, applies a stateless transformation, and writes the result to another Kafka topic. Records
flow through with low latency even though the trigger is 5 minutes.
<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="5 minutes") \
.start()
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes")) // enable Real-time Mode
.start()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.spark.sql.streaming.Trigger;
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes")) // enable Real-time Mode
.start();
{% endhighlight %}
</div>
</div>
### Trigger API
- *Scala and Java*: the trigger is `Trigger.RealTime(...)`, imported from
`org.apache.spark.sql.streaming.Trigger`. Several forms are available:
+ `Trigger.RealTime()` -- uses the default batch duration of 5 minutes.
+ `Trigger.RealTime("5 minutes")` -- a duration string.
+ `Trigger.RealTime(300000)` -- the batch duration in milliseconds, as a `long`.
+ `Trigger.RealTime(5, TimeUnit.MINUTES)` -- a value together with a
`java.util.concurrent.TimeUnit`.
+ `Trigger.RealTime(Duration("10 seconds"))` -- a Scala `scala.concurrent.duration.Duration`.
- *Python*: pass the batch duration as a string to the `realTime` keyword argument of `trigger()`,
for example `.trigger(realTime="5 minutes")`. The duration is required in Python.
### Requirements
A query must satisfy all of the following before it can start in Real-time Mode; each is checked when
the query starts:
- The output mode must be `update`. Any other output mode fails to start with
`STREAMING_REAL_TIME_MODE.OUTPUT_MODE_NOT_SUPPORTED`.
- A `checkpointLocation` is required, as with any other Structured Streaming query.
- The batch duration must be at least `spark.sql.streaming.realTimeMode.minBatchDuration`
(5000 ms, i.e. 5 seconds, by default); a shorter interval fails to start with
`INVALID_STREAMING_REAL_TIME_MODE_TRIGGER_INTERVAL`. The duration string must parse to a positive
interval, and month-based intervals (for example, `"1 month"`) are not accepted. (This 5-second
minimum is distinct from the 5-minute default; see
[Batch Duration Is a Checkpoint Interval](#batch-duration-is-a-checkpoint-interval).)
## Supported Queries
Real-time Mode supports stateless, map-like queries only.
The following operations, sources, and sinks are supported as of Spark 4.1.0:
- *Operations*: stateless, map-like operations are supported:
+ Projections: `select`, `selectExpr`, `withColumn`, `drop`, and the typed `map` / `flatMap` Dataset operations.
+ Selections: `where` / `filter`.
+ Expressions that compile to a projection -- including functions such as `from_json` / `to_json`
and scalar user-defined functions (UDFs).
+ Column generators such as `explode`.
+ `union` of two or more *distinct* streaming sources. Referencing the same source DataFrame more
than once is not supported and fails with
`STREAMING_REAL_TIME_MODE.IDENTICAL_SOURCES_IN_UNION_NOT_SUPPORTED`; create a separate DataFrame
for each source instead.
+ Stream-static joins, where a streaming DataFrame is joined with a static DataFrame. The static
side must be broadcast (use the `broadcast(...)` hint), because Real-time Mode does not support
shuffles.
+ `withWatermark` (event-time watermark declaration) is allowed, although it has no effect because
stateful operators are not supported. This lets queries that already declare a watermark run in
Real-time Mode without modification.
- *Sources*: the source must support Real-time Mode. In Apache Spark, the **Kafka** source supports
Real-time Mode. An unsupported source fails with
`STREAMING_REAL_TIME_MODE.INPUT_STREAM_NOT_SUPPORTED`. (The built-in `rate` source is not supported
as a Real-time source.)
- *Sinks*:
+ **Kafka** sink.
+ **Foreach** sink (via `ForeachWriter`), for writing to arbitrary external systems one record at
a time. See [Using Foreach](./apis-on-dataframes-and-datasets.html#using-foreach-and-foreachbatch).
Note that `foreachBatch` is *not* supported, because it processes each batch as a whole rather
than one record at a time.
+ **Console** and **memory** sinks, which are useful for development and debugging.
Other sinks fail with `STREAMING_REAL_TIME_MODE.SINK_NOT_SUPPORTED`.
The operators and sinks used by a Real-time query are checked against an allowlist before the query
starts; anything outside the allowlist fails with
`STREAMING_REAL_TIME_MODE.OPERATOR_OR_SINK_NOT_IN_ALLOWLIST`.
### Not supported
Stateful operations of any kind are not supported in this release. This includes streaming
aggregations, `dropDuplicates` / `dropDuplicatesWithinWatermark`, stream-stream joins, `repartition`
and other operations that introduce a shuffle, and stateful operators such as
`flatMapGroupsWithState` and `transformWithState`. Support for stateful operations is planned
starting in Spark 4.3. Asynchronous progress tracking is also not supported; enabling it fails with
`STREAMING_REAL_TIME_MODE.ASYNC_PROGRESS_TRACKING_NOT_SUPPORTED`.
## Fault Tolerance
Real-time Mode provides the same **exactly-once processing** guarantees as the default micro-batch
engine. Two distinct guarantees are worth separating:
- **Exactly-once processing** means every input record's effect on the state the engine manages (for
example, aggregation counts) is applied effectively once, even across failures and restarts.
- **Delivery semantics** describe whether a record may be written to the external system more than
once. This is a property of the **sink**, not the execution mode.
Real-time Mode is exactly-once with respect to processing. End-to-end delivery depends on the sink: a
sink that performs idempotent or transactional writes can deliver **exactly-once**, while other sinks
deliver **at-least-once** (duplicates are possible after a failure). The built-in Kafka sink provides
at-least-once delivery, with or without Real-time Mode. Real-time Mode does not yet ship an
exactly-once sink, though one can be implemented.
Internally, offsets are committed at the *end* of each batch, after the corresponding records have
already been written to the sink. If a query fails partway through a batch, it resumes from the last
committed offsets on restart and may re-write records emitted before the failure. Design sinks to
tolerate duplicates -- for example, with idempotent writes -- where exactly-once output matters.
## Examples
The following examples read from Kafka and assume a running Kafka cluster. Each example shows the
same query in Python, Scala, and Java.
### Stream-static join
Enrich a stream by joining it with a static reference dataset. The static side is wrapped in
`broadcast(...)` so the join is executed as a broadcast (map-side) join, which avoids a shuffle.
<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}
from pyspark.sql.functions import broadcast
# Static reference data, read once as a batch DataFrame.
reference = spark.read.format("parquet").load("/path/to/reference")
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value") \
.join(broadcast(reference), "joinKey") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("topic", "output-topic") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="5 minutes") \
.start()
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.sql.functions.broadcast
import org.apache.spark.sql.streaming.Trigger
// Static reference data, read once as a batch DataFrame.
val reference = spark.read.format("parquet").load("/path/to/reference")
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
.join(broadcast(reference), "joinKey")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes"))
.start()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
import static org.apache.spark.sql.functions.broadcast;
import org.apache.spark.sql.streaming.Trigger;
// Static reference data, read once as a batch DataFrame.
Dataset<Row> reference = spark.read().format("parquet").load("/path/to/reference");
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING) AS joinKey", "CAST(value AS STRING) AS value")
.join(broadcast(reference), "joinKey")
.writeStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "output-topic")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("5 minutes"))
.start();
{% endhighlight %}
</div>
</div>
### Writing to the console for development
The console sink prints output to the driver's standard output and is handy while developing a
query. Note that the console sink buffers each batch's rows and prints them when the batch commits,
so its output appears once per batch -- here, every 30 seconds -- rather than continuously. This
makes it useful for inspecting results, but it does not reflect Real-time Mode's true per-record
latency; to observe that, use a row-by-row sink such as Kafka. A shorter batch duration simply makes
the console refresh more often.
<div class="codetabs">
<div data-lang="python" markdown="1">
{% highlight python %}
spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "input-topic") \
.load() \
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
.where("value IS NOT NULL") \
.writeStream \
.format("console") \
.option("checkpointLocation", "/path/to/checkpoint") \
.outputMode("update") \
.trigger(realTime="30 seconds") \
.start()
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
import org.apache.spark.sql.streaming.Trigger
spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.where("value IS NOT NULL")
.writeStream
.format("console")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("30 seconds"))
.start()
{% endhighlight %}
</div>
<div data-lang="java" markdown="1">
{% highlight java %}
import org.apache.spark.sql.streaming.Trigger;
spark
.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "input-topic")
.load()
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.where("value IS NOT NULL")
.writeStream()
.format("console")
.option("checkpointLocation", "/path/to/checkpoint")
.outputMode("update")
.trigger(Trigger.RealTime("30 seconds"))
.start();
{% endhighlight %}
</div>
</div>
## Configuration
| Configuration | Default | Meaning |
|---|---|---|
| `spark.sql.streaming.realTimeMode.minBatchDuration` | `5000` (ms, 5 seconds) | The minimum batch duration, in milliseconds, allowed for a Real-time trigger. See the batch-duration requirement under [Requirements](#requirements). |
| `spark.sql.streaming.realTimeMode.allowlistCheck` | `true` | Whether to verify that all operators and sinks used by a Real-time query are in the supported allowlist. Disabling this check (not recommended) lets unsupported operators and sinks run at your own risk. |
## Best Practices
- Real-time Mode launches long-running tasks -- one per input partition -- that continuously read,
process, and write data. The number of tasks a query needs depends on how many partitions it reads
from its sources in parallel. Before starting a Real-time query, ensure the cluster has enough
cores to run all of these tasks simultaneously and continuously. For example, reading from a Kafka
topic with 10 partitions requires at least 10 cores for the query to make progress. Real-time Mode
uses a fixed 1:1 mapping between Kafka topic partitions and reader tasks; the `minPartitions`
option is not supported in Real-time Mode.
- Run a single Real-time query per cluster. Because Real-time Mode holds its task slots for the
entire batch duration, any other queries sharing the cluster compete for the same slots, which can
starve the Real-time query of resources and increase its latency.
## Caveats
- Real-time Mode provides exactly-once processing semantics, but sinks may receive duplicate records
after a failure. See [Fault Tolerance](#fault-tolerance) for how to design sinks for exactly-once
writes.
- Adaptive Query Execution (AQE) is not supported for Real-time Mode queries.