layout: global displayTitle: Structured Streaming Programming Guide title: Structured Streaming Programming Guide license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Asynchronous progress tracking allows streaming queries to checkpoint progress asynchronously and in parallel to the actual data processing within a micro-batch, reducing latency associated with maintaining the offset log and commit log.
Structured Streaming relies on persisting and managing offsets as progress indicators for query processing. Offset management operation directly impacts processing latency, because no data processing can occur until these operations are complete. Asynchronous progress tracking enables streaming queries to checkpoint progress without being impacted by these offset management operations.
The code snippet below provides an example of how to use this feature:
val stream = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "in") .load() val query = stream.writeStream .format("kafka") .option("topic", "out") .option("checkpointLocation", "/tmp/checkpoint") .option("asyncProgressTrackingEnabled", "true") .start()
The table below describes the configurations for this feature and default values associated with them.
Option | Value | Default | Description |
---|---|---|---|
asyncProgressTrackingEnabled | true/false | false | enable or disable asynchronous progress tracking |
asyncProgressTrackingCheckpointIntervalMs | millisecond | 1000 | the interval in which we commit offsets and completion commits |
The initial version of the feature has the following limitations:
Turning the async progress tracking off may cause the following exception to be thrown
java.lang.IllegalStateException: batch x doesn't exist
Also the following error message may be printed in the driver logs:
The offset log for batch x doesn't exist, which is required to restart the query from the latest batch x from the offset log. Please ensure there are two subsequent offset logs available for the latest batch via manually deleting the offset file(s). Please also ensure the latest batch for commit log is equal or one batch earlier than the latest batch for offset log.
This is caused by the fact that when async progress tracking is enabled, the framework will not checkpoint progress for every batch as would be done if async progress tracking is not used. To solve this problem simply re-enable “asyncProgressTrackingEnabled” and set “asyncProgressTrackingCheckpointIntervalMs” to 0 and run the streaming query until at least two micro-batches have been processed. Async progress tracking can be now safely disabled and restarting query should proceed normally.
Continuous processing is a new, experimental streaming execution mode introduced in Spark 2.3 that enables low (~1 ms) end-to-end latency with at-least-once fault-tolerance guarantees. Compare this with the default micro-batch processing engine which can achieve exactly-once guarantees but achieve latencies of ~100ms at best. For some types of queries (discussed below), you can choose which mode to execute them in without modifying the application logic (i.e. without changing the DataFrame/Dataset operations).
To run a supported query in continuous processing mode, all you need to do is specify a continuous trigger with the desired checkpoint interval as a parameter. For example,
{% endhighlight %}
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “topic1”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “topic1”) .trigger(Trigger.Continuous(“1 second”)) // only change in query .start() {% endhighlight %}
spark .readStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“subscribe”, “topic1”) .load() .selectExpr(“CAST(key AS STRING)”, “CAST(value AS STRING)”) .writeStream .format(“kafka”) .option(“kafka.bootstrap.servers”, “host1:port1,host2:port2”) .option(“topic”, “topic1”) .trigger(Trigger.Continuous(“1 second”)) // only change in query .start(); {% endhighlight %}
A checkpoint interval of 1 second means that the continuous processing engine will record the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
As of Spark 2.4, only the following type of queries are supported in the continuous processing mode.
Operations: Only map-like Dataset/DataFrame operations are supported in continuous mode, that is, only projections (select
, map
, flatMap
, mapPartitions
, etc.) and selections (where
, filter
, etc.).
current_timestamp()
and current_date()
(deterministic computations using time is challenging).Sources:
numPartitions
and rowsPerSecond
.Sinks:
See Input Sources and Output Sinks sections for more details on them. While the console sink is good for testing, the end-to-end low-latency processing can be best observed with Kafka as the source and sink, as this allows the engine to process the data and make the results available in the output topic within milliseconds of the input data being available in the input topic.