| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.spark.sql.streaming |
| |
| import java.sql.Timestamp |
| import java.time.{Instant, LocalDateTime, ZoneId} |
| |
| import org.apache.spark.{SparkRuntimeException, SparkThrowable} |
| import org.apache.spark.sql.AnalysisException |
| import org.apache.spark.sql.catalyst.ExtendedAnalysisException |
| import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamExecution} |
| import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider |
| import org.apache.spark.sql.functions.window |
| import org.apache.spark.sql.internal.SQLConf |
| |
| case class InputEventRow( |
| key: String, |
| eventTime: Timestamp, |
| event: String) |
| |
| case class OutputRow( |
| key: String, |
| outputEventTime: Timestamp, |
| count: Int) |
| |
| class TestStatefulProcessor |
| extends StatefulProcessor[String, InputEventRow, OutputRow] { |
| override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {} |
| |
| override def handleInputRows( |
| key: String, |
| inputRows: Iterator[InputEventRow], |
| timerValues: TimerValues, |
| expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { |
| if (inputRows.isEmpty) { |
| Iterator.empty |
| } else { |
| var minEventTime = inputRows.next().eventTime |
| var count = 1 |
| inputRows.foreach { row => |
| if (row.eventTime.before(minEventTime)) { |
| minEventTime = row.eventTime |
| } |
| count += 1 |
| } |
| Iterator.single(OutputRow(key, minEventTime, count)) |
| } |
| } |
| } |
| |
| class InputCountStatefulProcessor[T] |
| extends StatefulProcessor[String, T, Int] { |
| override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {} |
| |
| override def handleInputRows( |
| key: String, |
| inputRows: Iterator[T], |
| timerValues: TimerValues, |
| expiredTimerInfo: ExpiredTimerInfo): Iterator[Int] = { |
| Iterator.single(inputRows.size) |
| } |
| } |
| |
| /** |
| * Emits output row with timestamp older than current watermark for batchId > 0. |
| */ |
| class StatefulProcessorEmittingRowsOlderThanWatermark |
| extends StatefulProcessor[String, InputEventRow, OutputRow] { |
| override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {} |
| |
| override def handleInputRows( |
| key: String, |
| inputRows: Iterator[InputEventRow], |
| timerValues: TimerValues, |
| expiredTimerInfo: ExpiredTimerInfo): Iterator[OutputRow] = { |
| Iterator.single( |
| OutputRow( |
| key, |
| // always emit value with eventTime 1 which will fail after first batch, as |
| // watermark will move past 0L |
| Timestamp.from(Instant.ofEpochMilli(1)), |
| inputRows.size)) |
| } |
| } |
| |
| case class Window( |
| start: Timestamp, |
| end: Timestamp) |
| |
| case class AggEventRow( |
| window: Window, |
| count: Long) |
| |
| class TransformWithStateChainingSuite extends StreamTest { |
| import testImplicits._ |
| |
| test("watermark is propagated correctly for next stateful operator" + |
| " after transformWithState") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| .groupBy(window($"outputEventTime", "1 minute")) |
| .count() |
| .as[AggEventRow] |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-01-01 00:00:00"), "e1")), |
| // watermark should be 1 minute behind `2024-01-01 00:00:00`, nothing is |
| // emitted as all records have timestamp > epoch |
| CheckNewAnswer(), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2023-12-31 23:59:00")) |
| }, |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| // global watermark should now be 1 minute behind `2024-02-01 00:00:00`. |
| CheckNewAnswer(AggEventRow( |
| Window(timestamp("2024-01-01 00:00:00"), timestamp("2024-01-01 00:01:00")), 1) |
| ), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2024-01-31 23:59:00")) |
| }, |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-02 00:00:00"), "e1")), |
| CheckNewAnswer(AggEventRow( |
| Window(timestamp("2024-02-01 00:00:00"), timestamp("2024-02-01 00:01:00")), 1) |
| ) |
| ) |
| } |
| } |
| |
| test("passing eventTime column to transformWithState fails if" + |
| " no watermark is defined") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| |
| val ex = intercept[AnalysisException] { |
| inputData.toDS() |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| } |
| |
| checkError(ex, "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK") |
| } |
| } |
| |
| test("missing eventTime column to transformWithState fails the query if" + |
| " another stateful operator is added") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| TimeMode.None(), |
| OutputMode.Append()) |
| .groupBy(window($"outputEventTime", "1 minute")) |
| .count() |
| |
| val ex = intercept[ExtendedAnalysisException] { |
| testStream(result, OutputMode.Append())( |
| StartStream() |
| ) |
| } |
| assert(ex.getMessage.contains("there are streaming aggregations on" + |
| " streaming DataFrames/DataSets without watermark")) |
| } |
| } |
| |
| test("chaining multiple transformWithState operators") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| .groupByKey(x => x.key) |
| .transformWithState( |
| new InputCountStatefulProcessor[OutputRow](), |
| TimeMode.None(), |
| OutputMode.Append() |
| ) |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-01-01 00:00:00"), "e1")), |
| CheckNewAnswer(1), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2023-12-31 23:59:00")) |
| }, |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| CheckNewAnswer(1), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2024-01-31 23:59:00")) |
| }, |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-02 00:00:00"), "e1")), |
| CheckNewAnswer(1) |
| ) |
| } |
| } |
| |
| test("dropDuplicateWithWatermark after transformWithState operator" + |
| " fails if watermark column is not provided") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| TimeMode.None(), |
| OutputMode.Append()) |
| .dropDuplicatesWithinWatermark() |
| |
| val ex = intercept[ExtendedAnalysisException] { |
| testStream(result, OutputMode.Append())( |
| StartStream() |
| ) |
| } |
| assert(ex.getMessage.contains("dropDuplicatesWithinWatermark is not supported on" + |
| " streaming DataFrames/DataSets without watermark")) |
| } |
| } |
| |
| test("dropDuplicateWithWatermark after transformWithState operator") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| .dropDuplicatesWithinWatermark() |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1"), |
| InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| CheckNewAnswer(OutputRow("k1", timestamp("2024-02-01 00:00:00"), 2)), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2024-01-31 23:59:00")) |
| } |
| ) |
| } |
| } |
| |
| test("query fails if the output dataset does not contain specified eventTimeColumn") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| |
| val ex = intercept[ExtendedAnalysisException] { |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "missingEventTimeColumn", |
| OutputMode.Append()) |
| |
| testStream(result, OutputMode.Append())( |
| StartStream() |
| ) |
| } |
| |
| checkError(ex, "UNRESOLVED_COLUMN.WITH_SUGGESTION", |
| parameters = Map( |
| "objectName" -> "`missingEventTimeColumn`", |
| "proposal" -> "`outputEventTime`, `count`, `key`")) |
| } |
| } |
| |
| test("query fails if the output dataset contains rows older than current watermark") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| val result = inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new StatefulProcessorEmittingRowsOlderThanWatermark(), |
| "outputEventTime", |
| OutputMode.Append()) |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| // after first batch, the rows are emitted with timestamp 1 ms after epoch |
| CheckNewAnswer(OutputRow("k1", Timestamp.from(Instant.ofEpochMilli(1)), 1)), |
| // this batch would fail now, because watermark will move past 1ms after epoch |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-02 00:00:00"), "e1")), |
| ExpectFailure[SparkRuntimeException] { ex => |
| checkError(ex.asInstanceOf[SparkThrowable], |
| "EMITTING_ROWS_OLDER_THAN_WATERMARK_NOT_ALLOWED", |
| parameters = Map("currentWatermark" -> "1706774340000", |
| "emittedRowEventTime" -> "1000")) |
| } |
| ) |
| } |
| } |
| |
| test("ensure that watermark delay is resolved from a view") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| inputData.toDS() |
| .withWatermark("eventTime", "1 minute") |
| .createTempView("tempViewWithWatermark") |
| |
| val result = spark.readStream.table("tempViewWithWatermark") |
| .as[InputEventRow] |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| CheckNewAnswer(OutputRow("k1", timestamp("2024-02-01 00:00:00"), 1)), |
| Execute("assertWatermarkEquals") { q => |
| assertWatermarkEquals(q, timestamp("2024-01-31 23:59:00")) |
| } |
| ) |
| } |
| } |
| |
| test("ensure that query fails if there is no watermark when reading from a view") { |
| withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key -> |
| classOf[RocksDBStateStoreProvider].getName) { |
| val inputData = MemoryStream[InputEventRow] |
| inputData.toDS() |
| .createTempView("tempViewWithoutWatermark") |
| |
| val ex = intercept[AnalysisException] { |
| val result = spark.readStream.table("tempViewWithoutWatermark") |
| .as[InputEventRow] |
| .groupByKey(x => x.key) |
| .transformWithState[OutputRow]( |
| new TestStatefulProcessor(), |
| "outputEventTime", |
| OutputMode.Append()) |
| |
| testStream(result, OutputMode.Append())( |
| AddData(inputData, InputEventRow("k1", timestamp("2024-02-01 00:00:00"), "e1")), |
| ExpectFailure[SparkRuntimeException] { ex => |
| checkError(ex.asInstanceOf[AnalysisException], |
| "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK") |
| } |
| ) |
| } |
| |
| checkError(ex, "CANNOT_ASSIGN_EVENT_TIME_COLUMN_WITHOUT_WATERMARK") |
| } |
| } |
| |
| private def timestamp(str: String): Timestamp = { |
| Timestamp.valueOf(str) |
| } |
| |
| private def assertWatermarkEquals(q: StreamExecution, watermark: Timestamp): Unit = { |
| val queryWatermark = getQueryWatermark(q) |
| assert(queryWatermark.isDefined) |
| assert(queryWatermark.get === watermark) |
| } |
| |
| private def getQueryWatermark(q: StreamExecution): Option[Timestamp] = { |
| import scala.jdk.CollectionConverters._ |
| val eventTimeMap = q.lastProgress.eventTime.asScala |
| val queryWatermark = eventTimeMap.get("watermark") |
| queryWatermark.map { v => |
| val instant = Instant.parse(v) |
| val local = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()) |
| Timestamp.valueOf(local) |
| } |
| } |
| } |