title: “Structured Streaming” weight: 11 type: docs aliases:
Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.
{{< hint info >}}
Paimon Structured Streaming only supports the two append and complete modes.
{{< /hint >}}
// Create a paimon table if not exists. spark.sql(s""" |CREATE TABLE T (k INT, v STRING) |TBLPROPERTIES ('primary-key'='k', 'bucket'='3') |""".stripMargin) // Here we use MemoryStream to fake a streaming source. val inputData = MemoryStream[(Int, String)] val df = inputData.toDS().toDF("k", "v") // Streaming Write to paimon table. val stream = df .writeStream .outputMode("append") .option("checkpointLocation", "/path/to/checkpoint") .format("paimon") .start("/path/to/paimon/sink/table")
Streaming write also supports [Write merge schema]({{< ref “spark/sql-write#write-merge-schema” >}}).
{{< hint info >}}
Paimon currently supports Spark 3.3+ for streaming read.
{{< /hint >}}
Paimon supports rich scan mode for streaming read. There is a list:
A simple example with default scan mode:
// no any scan-related configs are provided, that will use latest-full scan mode. val query = spark.readStream .format("paimon") // by table name .table("table_name") // or by location // .load("/path/to/paimon/source/table") .writeStream .format("console") .start()
Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.
These read limits are supported:
Example: One
Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.
// Trigger.AvailableNow()) processes all available data at the start // of the query in one or multiple batches, then terminates the query. // That set read.stream.maxBytesPerTrigger to 128M means that each // batch processes a maximum of 128 MB of data. val query = spark.readStream .format("paimon") .option("read.stream.maxBytesPerTrigger", "134217728") .table("table_name") .writeStream .format("console") .trigger(Trigger.AvailableNow()) .start()
Example: Two
Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.
// It will not trigger a batch until there are more than 5,000 pieces of data, // unless the interval between the two batches is more than 300 seconds. val query = spark.readStream .format("paimon") .option("read.stream.minRowsPerTrigger", "5000") .option("read.stream.maxTriggerDelayMs", "300000") .table("table_name") .writeStream .format("console") .start()
Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:
read.changelog to true (default is false), then streaming read with table locationExample:
// Option 1 val query1 = spark.readStream .format("paimon") .table("`table_name$audit_log`") .writeStream .format("console") .start() // Option 2 val query2 = spark.readStream .format("paimon") .option("read.changelog", "true") .table("table_name") .writeStream .format("console") .start() /* +I 1 Hi +I 2 Hello */