title: “Structured Streaming” weight: 11 type: docs aliases:

  • /spark/structured-streaming.html

Structured Streaming

Paimon supports streaming data processing with Spark Structured Streaming, enabling both streaming write and streaming query.

Streaming Write

{{< hint info >}}

Paimon Structured Streaming only supports the two append and complete modes.

{{< /hint >}}

// Create a paimon table if not exists.
spark.sql(s"""
           |CREATE TABLE T (k INT, v STRING)
           |TBLPROPERTIES ('primary-key'='k', 'bucket'='3')
           |""".stripMargin)

// Here we use MemoryStream to fake a streaming source.
val inputData = MemoryStream[(Int, String)]
val df = inputData.toDS().toDF("k", "v")

// Streaming Write to paimon table.
val stream = df
  .writeStream
  .outputMode("append")
  .option("checkpointLocation", "/path/to/checkpoint")
  .format("paimon")
  .start("/path/to/paimon/sink/table")

Streaming write also supports [Write merge schema]({{< ref “spark/sql-write#write-merge-schema” >}}).

Streaming Query

{{< hint info >}}

Paimon currently supports Spark 3.3+ for streaming read.

{{< /hint >}}

Paimon supports rich scan mode for streaming read. There is a list:

A simple example with default scan mode:

// no any scan-related configs are provided, that will use latest-full scan mode.
val query = spark.readStream
  .format("paimon")
  // by table name
  .table("table_name") 
  // or by location
  // .load("/path/to/paimon/source/table")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming also supports a variety of streaming read modes, it can support many triggers and many read limits.

These read limits are supported:

Example: One

Use org.apache.spark.sql.streaming.Trigger.AvailableNow() and maxBytesPerTrigger defined by paimon.

// Trigger.AvailableNow()) processes all available data at the start
// of the query in one or multiple batches, then terminates the query.
// That set read.stream.maxBytesPerTrigger to 128M means that each
// batch processes a maximum of 128 MB of data.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.maxBytesPerTrigger", "134217728")
  .table("table_name")
  .writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

Example: Two

Use org.apache.spark.sql.connector.read.streaming.ReadMinRows.

// It will not trigger a batch until there are more than 5,000 pieces of data,
// unless the interval between the two batches is more than 300 seconds.
val query = spark.readStream
  .format("paimon")
  .option("read.stream.minRowsPerTrigger", "5000")
  .option("read.stream.maxTriggerDelayMs", "300000")
  .table("table_name")
  .writeStream
  .format("console")
  .start()

Paimon Structured Streaming supports read row in the form of changelog (add rowkind column in row to represent its change type) in two ways:

  • Direct streaming read with the system audit_log table
  • Set read.changelog to true (default is false), then streaming read with table location

Example:

// Option 1
val query1 = spark.readStream
  .format("paimon")
  .table("`table_name$audit_log`")
  .writeStream
  .format("console")
  .start()

// Option 2
val query2 = spark.readStream
  .format("paimon")
  .option("read.changelog", "true")
  .table("table_name")
  .writeStream
  .format("console")
  .start()

/*
+I   1  Hi
+I   2  Hello
*/