Merge pull request #21944 from [cherry-pick][release-2.40.0][21941] Fix no output timestamp case

[cherry-pick][release-2.40.0][21941] Fix no output timestamp case
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c1f89c6..b5dbcb4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -53,6 +53,11 @@
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 class WindmillTimerInternals implements TimerInternals {
+  private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
+      GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));
+
+  private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
+      BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
 
   private static final String TIMER_HOLD_PREFIX = "/h";
   // Map from timer id to its TimerData. If it is to be deleted, we still need
@@ -286,8 +291,14 @@
     builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
 
     // Store the output timestamp in the metadata timestamp.
-    builder.setMetadataTimestamp(
-        WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+    Instant outputTimestamp = timerData.getOutputTimestamp();
+    if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+      // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
+      // the global window
+      // here instead.
+      outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
+    }
+    builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
     return builder;
   }
 
@@ -375,7 +386,13 @@
         throw new RuntimeException(e);
       }
     } else if (timer.hasMetadataTimestamp()) {
+      // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
+      // to change the upper
+      // bound.
       outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+      if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+        outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+      }
     }
 
     StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);