[SPARK-53737][SQL][SS] Add Real-time Mode trigger
### What changes were proposed in this pull request?
Introduce a new trigger type for Real-time Mode (RTM) in Structured Streaming. This new trigger will be how users enable their Structured Streaming query to run in Real-time Mode.
Please note this PR just adds the trigger. Users cannot yet run queries in Real-time Mode. Other functionality will come in later PRs.
### Why are the changes needed?
This serves as the first PR to add Real-time mode to Structured Streaming.
### Does this PR introduce _any_ user-facing change?
Yes, it adds a new trigger type to Structured Streaming. This change does not effect or change any existing behaviors.
### How was this patch tested?
Unit test added.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #52473 from jerrypeng/SPARK-53737.
Authored-by: Jerry Peng <jerry.peng@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
index 6e3a93b..8536df1 100644
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -22,10 +22,12 @@
import scala.concurrent.duration.Duration;
import org.apache.spark.annotation.Evolving;
+import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger;
/**
* Policy used to indicate how often results should be produced by a [[StreamingQuery]].
@@ -176,4 +178,56 @@
public static Trigger Continuous(String interval) {
return ContinuousTrigger.apply(interval);
}
+
+ /**
+ * A trigger for real time mode, with batch at the specified duration.
+ *
+ */
+ @Experimental
+ public static Trigger RealTime(long batchDurationMs) {
+ return RealTimeTrigger.apply(batchDurationMs);
+ }
+
+ /**
+ * A trigger for real time mode, with batch at the specified duration.
+ *
+ */
+ @Experimental
+ public static Trigger RealTime(long batchDuration, TimeUnit timeUnit) {
+ return RealTimeTrigger.create(batchDuration, timeUnit);
+ }
+
+ /**
+ * A trigger for real time mode, with batch at the specified duration.
+ *
+ * {{{
+ * import scala.concurrent.duration._
+ * df.writeStream.trigger(Trigger.RealTime(10.seconds))
+ * }}}
+ */
+ @Experimental
+ public static Trigger RealTime(Duration batchDuration) {
+ return RealTimeTrigger.apply(batchDuration);
+ }
+
+ /**
+ * A trigger for real time mode, with batch at the specified duration.
+ *
+ * {{{
+ * df.writeStream.trigger(Trigger.RealTime("10 seconds"))
+ * }}}
+ */
+ @Experimental
+ public static Trigger RealTime(String batchDuration) {
+ return RealTimeTrigger.apply(batchDuration);
+ }
+
+ /**
+ * A trigger for real time mode, with batch at the specified duration. The default duration is 5
+ * minutes.
+ */
+ @Experimental
+ public static Trigger RealTime() {
+ return RealTimeTrigger.apply();
+ }
}
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index 7d8b33a..ea7100f 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -19,9 +19,12 @@
import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, MINUTES}
+
+import org.json4s.DefaultFormats
import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.microsToMillis
import org.apache.spark.sql.catalyst.util.SparkIntervalUtils
@@ -114,3 +117,40 @@
ContinuousTrigger(convert(interval, unit))
}
}
+
+/**
+ * A [[Trigger]] that runs a query in real time mode.
+ * @param batchDurationMs
+ * The duration of each batch in milliseconds. This must be strictly positive.
+ */
+@Experimental
+case class RealTimeTrigger(batchDurationMs: Long) extends Trigger {
+ require(batchDurationMs > 0, "the batch duration should not be negative")
+
+ implicit val defaultFormats: DefaultFormats = DefaultFormats
+}
+
+@Experimental
+object RealTimeTrigger {
+ import Triggers._
+
+ def apply(): RealTimeTrigger = {
+ RealTimeTrigger(Duration(5, MINUTES))
+ }
+
+ def apply(batchDuration: String): RealTimeTrigger = {
+ RealTimeTrigger(convert(batchDuration))
+ }
+
+ def apply(batchDuration: Duration): RealTimeTrigger = {
+ RealTimeTrigger(convert(batchDuration))
+ }
+
+ def create(batchDuration: String): RealTimeTrigger = {
+ apply(batchDuration)
+ }
+
+ def create(batchDuration: Long, unit: TimeUnit): RealTimeTrigger = {
+ RealTimeTrigger(convert(batchDuration, unit))
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
new file mode 100644
index 0000000..e213554
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+
+class StreamRealTimeModeSuite extends StreamTest {
+
+ test("test trigger") {
+ def testTrigger(trigger: Trigger, actual: Long): Unit = {
+ val realTimeTrigger = trigger.asInstanceOf[RealTimeTrigger]
+ assert(
+ realTimeTrigger.batchDurationMs == actual,
+ s"Real time trigger duration should be ${actual} ms" +
+ s" but got ${realTimeTrigger.batchDurationMs} ms"
+ )
+ }
+
+ // test default
+ testTrigger(Trigger.RealTime(), 300000)
+
+ List(
+ ("1 second", 1000),
+ ("1 minute", 60000),
+ ("1 hour", 3600000),
+ ("1 day", 86400000),
+ ("1 week", 604800000)
+ ).foreach {
+ case (str, ms) =>
+ testTrigger(Trigger.RealTime(str), ms)
+ testTrigger(RealTimeTrigger(str), ms)
+ testTrigger(RealTimeTrigger.create(str), ms)
+
+ }
+
+ List(1000, 60000, 3600000, 86400000, 604800000).foreach { ms =>
+ testTrigger(Trigger.RealTime(ms), ms)
+ testTrigger(RealTimeTrigger(ms), ms)
+ testTrigger(new RealTimeTrigger(ms), ms)
+ }
+
+ List(
+ (Duration.apply(1000, "ms"), 1000),
+ (Duration.apply(60, "s"), 60000),
+ (Duration.apply(1, "h"), 3600000),
+ (Duration.apply(1, "d"), 86400000)
+ ).foreach {
+ case (duration, ms) =>
+ testTrigger(Trigger.RealTime(duration), ms)
+ testTrigger(RealTimeTrigger(duration), ms)
+ testTrigger(RealTimeTrigger(duration), ms)
+ }
+
+ List(
+ (1000, TimeUnit.MILLISECONDS, 1000),
+ (60, TimeUnit.SECONDS, 60000),
+ (1, TimeUnit.HOURS, 3600000),
+ (1, TimeUnit.DAYS, 86400000)
+ ).foreach {
+ case (interval, unit, ms) =>
+ testTrigger(Trigger.RealTime(interval, unit), ms)
+ testTrigger(RealTimeTrigger(interval, unit), ms)
+ testTrigger(RealTimeTrigger.create(interval, unit), ms)
+ }
+ // test invalid
+ List("-1", "0").foreach(
+ str =>
+ intercept[IllegalArgumentException] {
+ testTrigger(Trigger.RealTime(str), -1)
+ testTrigger(RealTimeTrigger.create(str), -1)
+ }
+ )
+
+ List(-1, 0).foreach(
+ duration =>
+ intercept[IllegalArgumentException] {
+ testTrigger(Trigger.RealTime(duration), -1)
+ testTrigger(RealTimeTrigger(duration), -1)
+ }
+ )
+ }
+}