Merge pull request #21944 from [cherry-pick][release-2.40.0][21941] Fix no output timestamp case
[cherry-pick][release-2.40.0][21941] Fix no output timestamp case
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index c1f89c6..b5dbcb4 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -53,6 +53,11 @@
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class WindmillTimerInternals implements TimerInternals {
+ private static final Instant OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE =
+ GlobalWindow.INSTANCE.maxTimestamp().plus(Duration.millis(1));
+
+ private static final Instant OUTPUT_TIMESTAMP_MAX_VALUE =
+ BoundedWindow.TIMESTAMP_MAX_VALUE.plus(Duration.millis(1));
private static final String TIMER_HOLD_PREFIX = "/h";
// Map from timer id to its TimerData. If it is to be deleted, we still need
@@ -286,8 +291,14 @@
builder.setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getTimestamp()));
// Store the output timestamp in the metadata timestamp.
- builder.setMetadataTimestamp(
- WindmillTimeUtils.harnessToWindmillTimestamp(timerData.getOutputTimestamp()));
+ Instant outputTimestamp = timerData.getOutputTimestamp();
+ if (outputTimestamp.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
+ // We can't encode any value larger than BoundedWindow.TIMESTAMP_MAX_VALUE, so use the end of
+ // the global window
+ // here instead.
+ outputTimestamp = OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE;
+ }
+ builder.setMetadataTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(outputTimestamp));
return builder;
}
@@ -375,7 +386,13 @@
throw new RuntimeException(e);
}
} else if (timer.hasMetadataTimestamp()) {
+ // We use BoundedWindow.TIMESTAMP_MAX_VALUE+1 to indicate "no output timestamp" so make sure
+ // to change the upper
+ // bound.
outputTimestamp = WindmillTimeUtils.windmillToHarnessTimestamp(timer.getMetadataTimestamp());
+ if (outputTimestamp.equals(OUTPUT_TIMESTAMP_MAX_WINDMILL_VALUE)) {
+ outputTimestamp = OUTPUT_TIMESTAMP_MAX_VALUE;
+ }
}
StateNamespace namespace = StateNamespaces.fromString(namespaceString, windowCoder);