[SPARK-53737][SQL][SS] Add Real-time Mode trigger

### What changes were proposed in this pull request?

Introduce a new trigger type for Real-time Mode (RTM) in Structured Streaming.  This new trigger will be how users enable their Structured Streaming query to run in Real-time Mode.

Please note this PR just adds the trigger.  Users cannot yet run queries in Real-time Mode. Other functionality will come in later PRs.

### Why are the changes needed?

This serves as the first PR to add Real-time mode to Structured Streaming.

### Does this PR introduce _any_ user-facing change?

Yes, it adds a new trigger type to Structured Streaming.  This change does not effect or change any existing behaviors.

### How was this patch tested?

Unit test added.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #52473 from jerrypeng/SPARK-53737.

Authored-by: Jerry Peng <jerry.peng@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
diff --git a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
index 6e3a93b..8536df1 100644
--- a/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -22,10 +22,12 @@
 import scala.concurrent.duration.Duration;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.annotation.Experimental;
 import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
 import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
 import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
 import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger;
 
 /**
  * Policy used to indicate how often results should be produced by a [[StreamingQuery]].
@@ -176,4 +178,56 @@
   public static Trigger Continuous(String interval) {
     return ContinuousTrigger.apply(interval);
   }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   */
+  @Experimental
+  public static Trigger RealTime(long batchDurationMs) {
+    return RealTimeTrigger.apply(batchDurationMs);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   */
+  @Experimental
+  public static Trigger RealTime(long batchDuration, TimeUnit timeUnit) {
+    return RealTimeTrigger.create(batchDuration, timeUnit);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   * {{{
+   *    import scala.concurrent.duration._
+   *    df.writeStream.trigger(Trigger.RealTime(10.seconds))
+   * }}}
+   */
+  @Experimental
+  public static Trigger RealTime(Duration batchDuration) {
+    return RealTimeTrigger.apply(batchDuration);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration.
+   *
+   * {{{
+   *    df.writeStream.trigger(Trigger.RealTime("10 seconds"))
+   * }}}
+   */
+  @Experimental
+  public static Trigger RealTime(String batchDuration) {
+    return RealTimeTrigger.apply(batchDuration);
+  }
+
+  /**
+   * A trigger for real time mode, with batch at the specified duration. The default duration is 5
+   * minutes.
+   */
+  @Experimental
+  public static Trigger RealTime() {
+    return RealTimeTrigger.apply();
+  }
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index 7d8b33a..ea7100f 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -19,9 +19,12 @@
 
 import java.util.concurrent.TimeUnit
 
-import scala.concurrent.duration.Duration
+import scala.concurrent.duration.{Duration, MINUTES}
+
+import org.json4s.DefaultFormats
 
 import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
 import org.apache.spark.sql.catalyst.util.SparkDateTimeUtils.microsToMillis
 import org.apache.spark.sql.catalyst.util.SparkIntervalUtils
@@ -114,3 +117,40 @@
     ContinuousTrigger(convert(interval, unit))
   }
 }
+
+/**
+ * A [[Trigger]] that runs a query in real time mode.
+ * @param batchDurationMs
+ *   The duration of each batch in milliseconds. This must be strictly positive.
+ */
+@Experimental
+case class RealTimeTrigger(batchDurationMs: Long) extends Trigger {
+  require(batchDurationMs > 0, "the batch duration should not be negative")
+
+  implicit val defaultFormats: DefaultFormats = DefaultFormats
+}
+
+@Experimental
+object RealTimeTrigger {
+  import Triggers._
+
+  def apply(): RealTimeTrigger = {
+    RealTimeTrigger(Duration(5, MINUTES))
+  }
+
+  def apply(batchDuration: String): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration))
+  }
+
+  def apply(batchDuration: Duration): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration))
+  }
+
+  def create(batchDuration: String): RealTimeTrigger = {
+    apply(batchDuration)
+  }
+
+  def create(batchDuration: Long, unit: TimeUnit): RealTimeTrigger = {
+    RealTimeTrigger(convert(batchDuration, unit))
+  }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
new file mode 100644
index 0000000..e213554
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamRealTimeModeSuite.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
+import org.apache.spark.sql.execution.streaming.RealTimeTrigger
+
+class StreamRealTimeModeSuite extends StreamTest {
+
+  test("test trigger") {
+    def testTrigger(trigger: Trigger, actual: Long): Unit = {
+      val realTimeTrigger = trigger.asInstanceOf[RealTimeTrigger]
+      assert(
+        realTimeTrigger.batchDurationMs == actual,
+        s"Real time trigger duration should be ${actual} ms" +
+        s" but got ${realTimeTrigger.batchDurationMs} ms"
+      )
+    }
+
+    // test default
+    testTrigger(Trigger.RealTime(), 300000)
+
+    List(
+      ("1 second", 1000),
+      ("1 minute", 60000),
+      ("1 hour", 3600000),
+      ("1 day", 86400000),
+      ("1 week", 604800000)
+    ).foreach {
+      case (str, ms) =>
+        testTrigger(Trigger.RealTime(str), ms)
+        testTrigger(RealTimeTrigger(str), ms)
+        testTrigger(RealTimeTrigger.create(str), ms)
+
+    }
+
+    List(1000, 60000, 3600000, 86400000, 604800000).foreach { ms =>
+      testTrigger(Trigger.RealTime(ms), ms)
+      testTrigger(RealTimeTrigger(ms), ms)
+      testTrigger(new RealTimeTrigger(ms), ms)
+    }
+
+    List(
+      (Duration.apply(1000, "ms"), 1000),
+      (Duration.apply(60, "s"), 60000),
+      (Duration.apply(1, "h"), 3600000),
+      (Duration.apply(1, "d"), 86400000)
+    ).foreach {
+      case (duration, ms) =>
+        testTrigger(Trigger.RealTime(duration), ms)
+        testTrigger(RealTimeTrigger(duration), ms)
+        testTrigger(RealTimeTrigger(duration), ms)
+    }
+
+    List(
+      (1000, TimeUnit.MILLISECONDS, 1000),
+      (60, TimeUnit.SECONDS, 60000),
+      (1, TimeUnit.HOURS, 3600000),
+      (1, TimeUnit.DAYS, 86400000)
+    ).foreach {
+      case (interval, unit, ms) =>
+        testTrigger(Trigger.RealTime(interval, unit), ms)
+        testTrigger(RealTimeTrigger(interval, unit), ms)
+        testTrigger(RealTimeTrigger.create(interval, unit), ms)
+    }
+    // test invalid
+    List("-1", "0").foreach(
+      str =>
+        intercept[IllegalArgumentException] {
+          testTrigger(Trigger.RealTime(str), -1)
+          testTrigger(RealTimeTrigger.create(str), -1)
+        }
+    )
+
+    List(-1, 0).foreach(
+      duration =>
+        intercept[IllegalArgumentException] {
+          testTrigger(Trigger.RealTime(duration), -1)
+          testTrigger(RealTimeTrigger(duration), -1)
+        }
+    )
+  }
+}