[FLINK-37310][doc] SQL connector for keyed savepoint data documentation
diff --git a/docs/content/docs/libs/state_processor_api.md b/docs/content/docs/libs/state_processor_api.md
index 8ccf2f2..2ec321a 100644
--- a/docs/content/docs/libs/state_processor_api.md
+++ b/docs/content/docs/libs/state_processor_api.md
@@ -26,7 +26,7 @@
# State Processor API
-Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifying savepoints and checkpoints using Flink’s DataStream API under `BATCH` execution.
+Apache Flink's State Processor API provides powerful functionality for reading, writing, and modifying savepoints and checkpoints using Flink’s [DataStream API]({{< ref "#datastream-api" >}}) and [Table API]({{< ref "#table-api" >}}) under `BATCH` execution.
Due to the [interoperability of DataStream and Table API]({{< ref "docs/dev/table/data_stream_api" >}}), you can even use relational Table API or SQL queries to analyze and process state data.
For example, you can take a savepoint of a running stream processing application and analyze it with a DataStream batch program to verify that the application behaves correctly.
@@ -76,7 +76,8 @@
The State Processor API allows you to identify operators using [UIDs]({{< ref "docs/concepts/glossary.md" >}}#UID) or [UID hashes]({{< ref "docs/concepts/glossary" >}}#UID-hashes) via `OperatorIdentifier#forUid/forUidHash`.
Hashes should only be used when the use of `UIDs` is not possible, for example when the application that created the [savepoint]({{< ref "docs/ops/state/savepoints" >}}) did not specify them or when the `UID` is unknown.
-## Reading State
+## DataStream API
+### Reading State
Reading state begins by specifying the path to a valid savepoint or checkpoint along with the `StateBackend` that should be used to restore the data.
The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application.
@@ -86,14 +87,13 @@
SavepointReader savepoint = SavepointReader.read(env, "hdfs://path/", new HashMapStateBackend());
```
-
-### Operator State
+#### Operator State
[Operator state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#operator-state) is any non-keyed state in Flink.
This includes, but is not limited to, any use of `CheckpointedFunction` or `BroadcastState` within an application.
When reading operator state, users specify the operator uid, the state name, and the type information.
-#### Operator List State
+##### Operator List State
Operator state stored in a `CheckpointedFunction` using `getListState` can be read using `SavepointReader#readListState`.
The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application.
@@ -105,7 +105,7 @@
Types.INT);
```
-#### Operator Union List State
+##### Operator Union List State
Operator state stored in a `CheckpointedFunction` using `getUnionListState` can be read using `SavepointReader#readUnionState`.
The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application.
@@ -118,7 +118,7 @@
Types.INT);
```
-#### Broadcast State
+##### Broadcast State
[BroadcastState]({{< ref "docs/dev/datastream/fault-tolerance/broadcast_state" >}}) can be read using `SavepointReader#readBroadcastState`.
The state name and type information should match those used to define the `MapStateDescriptor` that declared this state in the DataStream application.
@@ -132,7 +132,7 @@
Types.INT);
```
-#### Using Custom Serializers
+##### Using Custom Serializers
Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state.
@@ -144,9 +144,9 @@
new MyCustomIntSerializer());
```
-### Keyed State
+#### Keyed State
-[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), or partitioned state, is any state that is partitioned relative to a key.
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), also known as partitioned state, is any state that is partitioned relative to a key.
When reading a keyed state, users specify the operator id and a `KeyedStateReaderFunction<KeyType, OutputType>`.
The `KeyedStateReaderFunction` allows users to read arbitrary columns and complex state types such as ListState, MapState, and AggregatingState.
@@ -226,7 +226,7 @@
**Note:** When using a `KeyedStateReaderFunction`, all state descriptors must be registered eagerly inside of open. Any attempt to call a `RuntimeContext#get*State` will result in a `RuntimeException`.
-### Window State
+#### Window State
The state processor api supports reading state from a [window operator]({{< ref "docs/dev/datastream/operators/windows" >}}).
When reading a window state, users specify the operator id, window assigner, and aggregation type.
@@ -274,7 +274,6 @@
.aggregate(new ClickCounter())
.uid("click-window")
.addSink(new Sink());
-
```
This state can be read using the code below.
@@ -317,13 +316,12 @@
.window(TumblingEventTimeWindows.of(Duration.ofMinutes(1)))
.aggregate("click-window", new ClickCounter(), new ClickReader(), Types.String, Types.INT, Types.INT)
.print();
-
```
Additionally, trigger state - from `CountTrigger`s or custom triggers - can be read using the method
`Context#triggerState` inside the `WindowReaderFunction`.
-## Writing New Savepoints
+### Writing New Savepoints
`Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data.
Each savepoint is made up of one or more `StateBootstrapTransformation`'s (explained below), each of which defines the state for an individual operator.
@@ -350,7 +348,7 @@
The [UIDs]({{< ref "docs/ops/state/savepoints" >}}#assigning-operator-ids) associated with each operator must match one to one with the UIDs assigned to the operators in your `DataStream` application; these are how Flink knows what state maps to which operator.
-### Operator State
+#### Operator State
Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`.
@@ -382,7 +380,7 @@
.transform(new SimpleBootstrapFunction());
```
-### Broadcast State
+#### Broadcast State
[BroadcastState]({{< ref "docs/dev/datastream/fault-tolerance/broadcast_state" >}}) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory.
@@ -412,7 +410,7 @@
.transform(new CurrencyBootstrapFunction());
```
-### Keyed State
+#### Keyed State
Keyed state for `ProcessFunction`'s and other `RichFunction` types can be written using a `KeyedStateBootstrapFunction`.
@@ -456,7 +454,7 @@
<span class="label label-danger">Attention</span> If your bootstrap function creates timers, the state can only be restored using one of the [process]({{< ref "docs/dev/datastream/operators/process_function" >}}) type functions.
-### Window State
+#### Window State
The state processor api supports writing state for the [window operator]({{< ref "docs/dev/datastream/operators/windows" >}}).
When writing window state, users specify the operator id, window assigner, evictor, optional trigger, and aggregation type.
@@ -470,7 +468,7 @@
public long timestamp;
}
-
+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Account> accountDataSet = env.fromCollection(accounts);
@@ -482,7 +480,7 @@
.reduce((left, right) -> left + right);
```
-## Modifying Savepoints
+### Modifying Savepoints
Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job.
@@ -493,7 +491,7 @@
.write(newPath);
```
-### Changing UID (hashes)
+#### Changing UID (hashes)
`SavepointWriter#changeOperatorIdenfifier` can be used to modify the [UIDs]({{< ref "docs/concepts/glossary" >}}#uid) or [UID hash]({{< ref "docs/concepts/glossary" >}}#uid-hash) of an operator.
@@ -514,3 +512,119 @@
OperatorIdentifier.forUid("new-uid"))
...
```
+
+## Table API
+
+### Getting started
+
+Before you interrogate state using the table API, make sure to review our [Flink SQL]({{< ref "docs/dev/table/sql/overview" >}}) guidelines.
+
+IMPORTANT NOTE: State Table API only supports keyed state.
+
+### Keyed State
+
+[Keyed state]({{< ref "docs/dev/datastream/fault-tolerance/state" >}}#keyed-state), also known as partitioned state, is any state that is partitioned relative to a key.
+
+The SQL connector allows users to read arbitrary columns as ValueState and complex state types such as ListState, MapState.
+This means if an operator contains a stateful process function such as:
+```java
+eventStream
+ .keyBy(e -> (Integer)e.key)
+ .process(new StatefulFunction())
+ .uid("my-uid");
+
+...
+
+public class Account {
+ private Integer id;
+ public Double amount;
+
+ public Integer geId() {
+ return id;
+ }
+
+ public void setId(Integer id) {
+ this.id = id;
+ }
+}
+
+public class StatefulFunction extends KeyedProcessFunction<Integer, Integer, Void> {
+ private ValueState<Integer> myValueState;
+ private ValueState<Account> myAccountValueState;
+ private ListState<Integer> myListState;
+ private MapState<Integer, Integer> myMapState;
+
+ @Override
+ public void open(OpenContext openContext) {
+ myValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyValueState", Integer.class));
+ myAccountValueState = getRuntimeContext().getState(new ValueStateDescriptor<>("MyAccountValueState", Account.class));
+ myValueState = getRuntimeContext().getListState(new ListStateDescriptor<>("MyListState", Integer.class));
+ myMapState = getRuntimeContext().getMapState(new MapStateDescriptor<>("MyMapState", Integer.class, Integer.class));
+ }
+ ...
+}
+```
+
+Then it can read by querying a table created using the following SQL statement:
+```SQL
+CREATE TABLE state_table (
+ k INTEGER,
+ MyValueState INTEGER,
+ MyAccountValueState ROW<id INTEGER, amount DOUBLE>,
+ MyListState ARRAY<INTEGER>,
+ MyMapState MAP<INTEGER, INTEGER>,
+ PRIMARY KEY (k) NOT ENFORCED
+) WITH (
+ 'connector' = 'savepoint',
+ 'state.backend.type' = 'rocksdb',
+ 'state.path' = '/root/dir/of/checkpoint-data/chk-1',
+ 'operator.uid' = 'my-uid'
+);
+```
+
+### Connector options
+
+#### General options
+| Option | Required | Default | Type | Description |
+|--------------------|----------|---------|----------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| connector | required | (none) | String | Specify what connector to use, here should be 'savepoint'. |
+| state.backend.type | required | (none) | Enum Possible values: hashmap, rocksdb | Defines the state backend which must be used for state reading. This must match with the value which was defined in Flink job which created the savepoint or checkpoint. |
+| state.path | required | (none) | String | Defines the state path which must be used for state reading. All file system that are supported by Flink can be used here. |
+| operator.uid | optional | (none) | String | Defines the operator UID which must be used for state reading (can't be used together with `operator.uid.hash`). Either `operator.uid` or `operator.uid.hash` must be specified. |
+| operator.uid.hash | optional | (none) | String | Defines the operator UID hash which must be used for state reading (can't be used together with `operator.uid`). Either `operator.uid` or `operator.uid.hash` must be specified. |
+
+#### Connector options for column ‘#’
+| Option | Required | Default | Type | Description |
+|-------------------------|----------|---------|--------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| fields.#.state-name | optional | (none) | String | Overrides the state name which must be used for state reading. This can be useful when the state name contains characters which are not compliant with SQL column names. |
+| fields.#.state-type | optional | (none) | Enum Possible values: list, map, value | Defines the state type which must be used for state reading, including value, list and map. When it's not provided then it tries to infer from the SQL type (ARRAY=list, MAP=map, all others=value). |
+| fields.#.map-key-format | optional | (none) | String | Defines the format class scheme for decoding map value key data (for ex. java.lang.Long). When it's not provided then it tries to infer from the SQL type (only primitive types supported). |
+| fields.#.value-format | optional | (none) | String | Defines the format class scheme for decoding value data (for ex. java.lang.Long). When it's not provided then it tries to infer from the SQL type (only primitive types supported). |
+
+### Default Data Type Mapping
+
+The state SQL connector infers the data type for primitive types when `fields.#.value-format` and `fields.#.map-key-format`
+are not defined. The following table shows the `Flink SQL type` -> `Java type` default mapping. If the mapping is not calculated properly
+then it can be overridden with the two mentioned config parameters on a per-column basis.
+
+| Flink SQL type | Java type |
+|-------------------------|-------------------------------------------------------------------------|
+| CHAR / VARCHAR / STRING | java.lang.String |
+| BOOLEAN | boolean |
+| BINARY / VARBINARY | byte[] |
+| DECIMAL | org.apache.flink.table.data.DecimalData |
+| TINYINT | byte |
+| SMALLINT | short |
+| INTEGER | int |
+| BIGINT | long |
+| FLOAT | float |
+| DOUBLE | double |
+| DATE | int |
+| INTERVAL_YEAR_MONTH | long |
+| INTERVAL_DAY_TIME | long |
+| ARRAY | java.util.List |
+| MAP | java.util.Map |
+| ROW | java.util.List\<org.apache.flink.table.types.logical.RowType.RowField\> |
+
+SHORTCUT: When a complex java class is defined in a column with STRING SQL type then the class instance
+`toString` method result will be the column value. This can be useful when a quick explanatory query is required.