layout: global displayTitle: State Data Source Integration Guide title: State Data Source Integration Guide license: | Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
State data source Guide in Structured Streaming (Experimental)
State data source provides functionality to manipulate the state from the checkpoint.
As of Spark 4.0, state data source provides the read functionality with a batch query. Additional functionalities including write is on the future roadmap.
NOTE: this data source is currently marked as experimental - source options and the behavior (output) might be subject to change.
State data source enables reading key-value pairs from the state store in the checkpoint, via running a separate batch query. Users can leverage the functionality to cover two major use cases described below:
Users can read an instance of state store, which is matched to a single stateful operator in most cases. This means, users can expect that they can read the entire key-value pairs in the state for a single stateful operator.
Note that there could be an exception, e.g. stream-stream join, which leverages multiple state store instances internally. The data source abstracts the internal representation away from users and provides a user-friendly approach to read the state. See the section for stream-stream join for more details.
df = spark
.read
.format(“statestore”)
.load(“”)
{% endhighlight %}
val df = spark .read .format(“statestore”) .load(“”)
{% endhighlight %}
Dataset df = spark .read() .format(“statestore”) .load(“”);
{% endhighlight %}
Each row in the source has the following schema:
The nested columns for key and value heavily depend on the input schema of the stateful operator as well as the type of operator. Users are encouraged to query about the schema via df.schema() / df.printSchema() first to understand the type of output.
The following options must be set for the source.
The following configurations are optional:
Structured Streaming implements the stream-stream join feature via leveraging multiple instances of state store internally. These instances logically compose buffers to store the input rows for left and right.
Since it is more obvious to users to reason about, the data source provides the option ‘joinSide’ to read the buffered input for specific side of the join. To enable the functionality to read the internal state store instance directly, we also allow specifying the option ‘storeName’, with restriction that ‘storeName’ and ‘joinSide’ cannot be specified together.
TransformWithState is a stateful operator that allows users to maintain arbitrary state across batches. In order to read this state, the user needs to provide some additional options in the state data source reader query. This operator allows for multiple state variables to be used within the same query. However, because they could be of different composite types and encoding formats, they need to be read within a batch query one variable at a time. In order to allow this, the user needs to specify the stateVarName
for the state variable they are interested in reading.
Timers can be read by setting the option readRegisteredTimers
to true. This will return all the registered timer across grouping keys.
We also allow for composite type variables to be read in 2 formats:
Depending on your memory requirements, you can choose the format that best suits your use case.
If we want to understand the change of state store over microbatches instead of the whole state store at a particular microbatch, ‘readChangeFeed’ is the option to use. For example, this is the code to read the change of state from batch 2 to the latest committed batch.
df = spark
.read
.format(“statestore”)
.option(“readChangeFeed”, true)
.option(“changeStartBatchId”, 2)
.load(“”)
{% endhighlight %}
val df = spark .read .format(“statestore”) .option(“readChangeFeed”, true) .option(“changeStartBatchId”, 2) .load(“”)
{% endhighlight %}
Dataset df = spark .read() .format(“statestore”) .option(“readChangeFeed”, true) .option(“changeStartBatchId”, 2) .load(“”);
{% endhighlight %}
The output schema will also be different from the normal output.
Before querying the state from existing checkpoint via state data source, users would like to understand the information for the checkpoint, especially about state operator. This includes which operators and state store instances are available in the checkpoint, available range of batch IDs, etc.
Structured Streaming provides a data source named “State metadata source” to provide the state-related metadata information from the checkpoint.
Note: The metadata is constructed when the streaming query is running with Spark 4.0+. The existing checkpoint which has been running with lower Spark version does not have the metadata and will be unable to query/use with this metadata source. It is required to run the streaming query pointing the existing checkpoint in Spark 4.0+ to construct the metadata before querying. Users can optionally provide the batchId to get the operator metadata at a point in time.
df = spark
.read
.format(“state-metadata”)
.load(“”)
{% endhighlight %}
{% endhighlight %}
Dataset df = spark .read() .format(“state-metadata”) .load(“”);
{% endhighlight %}
The following options must be set for the source:
The following configurations are optional:
Each row in the source has the following schema:
One of the major use cases of this data source is to identify the operatorId to query if the query has multiple stateful operators, e.g. stream-stream join followed by deduplication. The column ‘operatorName’ helps users to identify the operatorId for given operator.
Additionally, if users want to query about an internal state store instance for a stateful operator (e.g. stream-stream join), the column ‘stateStoreName’ would be useful to determine the target.