blob: 8b7d0495b93af5552086acbbbed6aa0b9ee36b11 [file] [log] [blame] [view]
---
title: "Structured Streaming"
weight: 11
type: docs
aliases:
- /spark/structured-streaming.html
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Structured Streaming
Paimon supports streaming data processing with [Spark Structured Streaming](https://spark.apache.org/docs/latest/streaming/index.html), enabling both streaming write and streaming query.
## Streaming Write
{{< hint info >}}
Paimon Structured Streaming only supports the two `append` and `complete` modes.
{{< /hint >}}
```scala
// Create a paimon table if not exists.
spark.sql(s"""
|CREATE TABLE T (k INT, v STRING)
|TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
|""".stripMargin)
// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")
// Streaming Write to paimon table.
val stream = df
.writeStream
.outputMode("append")
.option("checkpointLocation", "/path/to/checkpoint")
.format("paimon")
.start("/path/to/paimon/sink/table")
```
Streaming write also supports [Write merge schema]({{< ref "spark/sql-write#write-merge-schema" >}}).
## Streaming Query
{{< hint info >}}
Paimon currently supports Spark 3.3+ for streaming read.
{{< /hint >}}
Paimon supports rich scan mode for streaming read. There is a list:
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Scan Mode</th>
<th class="text-left" style="width: 60%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>latest</h5></td>
<td>For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>latest-full</h5></td>
<td>For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes.</td>
</tr>
<tr>
<td><h5>from-timestamp</h5></td>
<td>For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>from-snapshot</h5></td>
<td>For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. </td>
</tr>
<tr>
<td><h5>from-snapshot-full</h5></td>
<td>For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes.</td>
</tr>
<tr>
<td><h5>default</h5></td>
<td>It is equivalent to from-snapshot if "scan.snapshot-id" is specified. It is equivalent to from-timestamp if "timestamp-millis" is specified. Or, It is equivalent to latest-full.</td>
</tr>
</tbody>
</table>
A simple example with default scan mode:
```scala
// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
.format("paimon")
// by table name
.table("table_name")
// or by location
// .load("/path/to/paimon/source/table")
.writeStream
.format("console")
.start()
```
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
<table class="configuration table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>read.stream.maxFilesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The maximum number of files returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.maxBytesPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum number of bytes returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.maxRowsPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum number of rows returned in a single batch.</td>
</tr>
<tr>
<td><h5>read.stream.minRowsPerTrigger</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The minimum number of rows returned in a single batch, which used to create MinRowsReadLimit with read.stream.maxTriggerDelayMs together.</td>
</tr>
<tr>
<td><h5>read.stream.maxTriggerDelayMs</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>The maximum delay between two adjacent batches, which used to create MinRowsReadLimit with read.stream.minRowsPerTrigger together.</td>
</tr>
</tbody>
</table>
**Example: One**
Use `org.apache.spark.sql.streaming.Trigger.AvailableNow()` and `maxBytesPerTrigger` defined by paimon.
```scala
// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
.format("paimon")
.option("read.stream.maxBytesPerTrigger", "134217728")
.table("table_name")
.writeStream
.format("console")
.trigger(Trigger.AvailableNow())
.start()
```
**Example: Two**
Use `org.apache.spark.sql.connector.read.streaming.ReadMinRows`.
```scala
// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
.format("paimon")
.option("read.stream.minRowsPerTrigger", "5000")
.option("read.stream.maxTriggerDelayMs", "300000")
.table("table_name")
.writeStream
.format("console")
.start()
```
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its
change type) in two ways:
- Direct streaming read with the system audit_log table
- Set `read.changelog` to true (default is false), then streaming read with table location
**Example:**
```scala
// Option 1
val query1 = spark.readStream
.format("paimon")
.table("`table_name$audit_log`")
.writeStream
.format("console")
.start()
// Option 2
val query2 = spark.readStream
.format("paimon")
.option("read.changelog", "true")
.table("table_name")
.writeStream
.format("console")
.start()
/*
+I 1 Hi
+I 2 Hello
*/
```