onTimer/setTimer signature updates
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 79bb6ef..4841c6a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -384,7 +384,12 @@
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
pushbackDoFnRunner.onTimer(
- timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId(),
+ window,
+ timerData.getTimestamp(),
+ timerData.getOutputTimestamp(),
+ timerData.getDomain());
}
pushbackDoFnRunner.finishBundle();
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index cf28436..e2fc262 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -38,7 +38,13 @@
* Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer in the
* given window.
*/
- void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+ void onTimer(
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain);
/**
* Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 4865e82..8f19b5f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -82,8 +82,13 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
@Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index d65b5f4..c310c49 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -83,7 +83,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
}
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index cc2e86a..32a61af 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -43,7 +43,13 @@
Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);
/** Calls the underlying {@link DoFn.OnTimer} method. */
- void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+ void onTimer(
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain);
/** Calls the underlying {@link DoFn.FinishBundle} method. */
void finishBundle();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 472a9d2..68f780d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -186,7 +186,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
// The effective timestamp is when derived elements will have their timestamp set, if not
// otherwise specified. If this is an event time timer, then they have the timestamp of the
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 36a89fe..b27e046 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -108,8 +108,13 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- underlying.onTimer(timerId, window, timestamp, timeDomain);
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ underlying.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
@Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index f69c74a..670f3a5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -117,7 +117,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
stateCleaner.clearForWindow(window);
// There should invoke the onWindowExpiration of DoFn
@@ -134,7 +139,7 @@
window,
cleanupTimer.currentInputWatermarkTime());
} else {
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 10972d6..90bb5aa 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -119,7 +119,12 @@
thrown.expectCause(is(fn.exceptionToThrow));
runner.onTimer(
- ThrowingDoFn.TIMER_ID, GlobalWindow.INSTANCE, new Instant(0), TimeDomain.EVENT_TIME);
+ ThrowingDoFn.TIMER_ID,
+ ThrowingDoFn.TIMER_ID,
+ GlobalWindow.INSTANCE,
+ new Instant(0),
+ new Instant(0),
+ TimeDomain.EVENT_TIME);
}
/**
@@ -239,8 +244,10 @@
// the method call.
runner.onTimer(
DoFnWithTimers.TIMER_ID,
+ DoFnWithTimers.TIMER_ID,
GlobalWindow.INSTANCE,
currentTime.plus(offset),
+ currentTime.plus(offset),
TimeDomain.EVENT_TIME);
assertThat(
@@ -248,8 +255,10 @@
contains(
TimerData.of(
DoFnWithTimers.TIMER_ID,
+ DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
currentTime.plus(offset),
+ currentTime.plus(offset),
TimeDomain.EVENT_TIME)));
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index 28b387e..10ac7a8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -283,7 +283,13 @@
// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
- runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+ runner.onTimer(
+ timerId,
+ timerId,
+ window,
+ new Instant(timestamp),
+ new Instant(timestamp),
+ TimeDomain.EVENT_TIME);
assertThat(
underlying.firedTimers,
@@ -320,12 +326,19 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
firedTimers.add(
TimerData.of(
timerId,
+ timerFamilyId,
StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
timestamp,
+ outputTimestamp,
timeDomain));
}
@@ -458,7 +471,13 @@
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ toTrigger.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
}
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 85b3c0b..be4e321 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -220,7 +220,13 @@
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ toTrigger.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
}
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 31eb80b..5f41175 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -222,7 +222,13 @@
public void onTimer(TimerData timer, BoundedWindow window) {
try {
- fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ fnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
} catch (Exception e) {
throw UserCodeException.wrap(e);
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index 9d853a2..ce54d5b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -68,12 +68,14 @@
@Override
public void onTimer(
final String timerId,
+ final String timerFamilyId,
final BoundedWindow window,
final Instant timestamp,
+ final Instant outputTimestamp,
final TimeDomain timeDomain) {
try (Closeable ignored =
MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
- delegate.onTimer(timerId, window, timestamp, timeDomain);
+ delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 6aee09f..08e0a90 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -215,7 +215,13 @@
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ doFnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
@Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8e63679..5956ef6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -813,7 +813,12 @@
BoundedWindow window = ((WindowNamespace) namespace).getWindow();
timerInternals.cleanupPendingTimer(timer.getNamespace());
pushbackDoFnRunner.onTimer(
- timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+ timerData.getTimerId(),
+ timerData.getTimerFamilyId(),
+ window,
+ timerData.getTimestamp(),
+ timerData.getOutputTimestamp(),
+ timerData.getDomain());
}
private void setCurrentInputWatermark(long currentInputWatermark) {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 891cef8..45fc2a1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -661,7 +661,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
Object timerKey = keyForTimer.get();
Preconditions.checkNotNull(timerKey, "Key for timer needs to be set before calling onTimer");
Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle");
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
index b0f9304..5c5ca6a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
@@ -67,20 +67,30 @@
static final class Timer implements BufferedElement {
private final String timerId;
+ private final String timerFamilyId;
private final BoundedWindow window;
private final Instant timestamp;
+ private final Instant outputTimestamp;
private final TimeDomain timeDomain;
- Timer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ Timer(
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
this.timerId = timerId;
this.window = window;
this.timestamp = timestamp;
this.timeDomain = timeDomain;
+ this.outputTimestamp = outputTimestamp;
+ this.timerFamilyId = timerFamilyId;
}
@Override
public void processWith(DoFnRunner doFnRunner) {
- doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+ doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
@Override
@@ -130,8 +140,10 @@
outStream.write(TIMER_MAGIC_BYTE);
Timer timer = (Timer) value;
STRING_CODER.encode(timer.timerId, outStream);
+ STRING_CODER.encode(timer.timerFamilyId, outStream);
windowCoder.encode(timer.window, outStream);
INSTANT_CODER.encode(timer.timestamp, outStream);
+ INSTANT_CODER.encode(timer.outputTimestamp, outStream);
outStream.write(timer.timeDomain.ordinal());
} else {
throw new IllegalStateException("Unexpected element " + value);
@@ -147,8 +159,10 @@
case TIMER_MAGIC_BYTE:
return new Timer(
STRING_CODER.decode(inStream),
+ STRING_CODER.decode(inStream),
windowCoder.decode(inStream),
INSTANT_CODER.decode(inStream),
+ INSTANT_CODER.decode(inStream),
TimeDomain.values()[inStream.read()]);
default:
throw new IllegalStateException(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 80aabc2..367ed32 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -117,9 +117,15 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
currentBufferingElementsHandler.buffer(
- new BufferedElements.Timer(timerId, window, timestamp, timeDomain));
+ new BufferedElements.Timer(
+ timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain));
}
@Override
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
index 9ebdefc..0828a22 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
@@ -52,7 +52,12 @@
WindowedValue.of("test", new Instant(2), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
BufferedElement timerElement =
new BufferedElements.Timer(
- "timerId", GlobalWindow.INSTANCE, new Instant(1), TimeDomain.EVENT_TIME);
+ "timerId",
+ "timerId",
+ GlobalWindow.INSTANCE,
+ new Instant(1),
+ new Instant(1),
+ TimeDomain.EVENT_TIME);
testRoundTrip(ImmutableList.of(element), coder);
testRoundTrip(ImmutableList.of(timerElement), coder);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
index 2903cc0..3cb92c9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
@@ -109,7 +109,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException("Unsupported for ProcessFn");
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index e50f1bd..6296461 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -82,7 +82,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException(
String.format("Timers are not supported by %s", GroupAlsoByWindowFn.class.getSimpleName()));
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 712a017..4c011ed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -357,7 +357,13 @@
private void processUserTimer(TimerData timer) throws Exception {
if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) {
BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
- fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ fnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
index bf5efee..41da4b3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
@@ -133,7 +133,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException(
"Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.");
}
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index 02a1292..f7e86c7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -77,7 +77,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
throw new UnsupportedOperationException(
"Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.");
}
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
index e291117..76e8375 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
@@ -92,7 +92,13 @@
TimerInternals.TimerData timer, DoFnRunner<KV<?, ?>, ?> doFnRunner) {
StateNamespace namespace = timer.getNamespace();
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ doFnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
index 101ee80..aefcf6d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
@@ -57,8 +57,16 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
- withMetrics(() -> underlying.onTimer(timerId, window, timestamp, timeDomain));
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
+ withMetrics(
+ () ->
+ underlying.onTimer(
+ timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain));
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 795b8c0..cd140c7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -452,7 +452,13 @@
// Need to pass in the keyed TimerData here
((DoFnRunnerWithKeyedInternals) fnRunner).onTimer(keyedTimerData, window);
} else {
- pushbackFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ pushbackFnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
}
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
index 6fb2bd3..3b2d1cb 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
@@ -62,7 +62,13 @@
try {
final TimerInternals.TimerData timer = keyedTimerData.getTimerData();
- onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
} finally {
clearKeyedInternals();
}
@@ -70,10 +76,15 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {
checkState(keyedInternals.getKey() != null, "Key is not set for timer");
- underlying.onTimer(timerId, window, timestamp, timeDomain);
+ underlying.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
}
@Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 3b1b938..99439a2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -262,7 +262,12 @@
@Override
public void onTimer(
- String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {}
+ String timerId,
+ String timerFamilyId,
+ BoundedWindow window,
+ Instant timestamp,
+ Instant outputTimestamp,
+ TimeDomain timeDomain) {}
@Override
public void finishBundle() {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
index 46dc282..55d97ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -71,11 +71,13 @@
@Override
public void onTimer(
final String timerId,
+ final String timerFamilyId,
final BoundedWindow window,
final Instant timestamp,
+ final Instant outputTimestamp,
final TimeDomain timeDomain) {
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
- delegate.onTimer(timerId, window, timestamp, timeDomain);
+ delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index 845dc63..013f860 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -71,11 +71,13 @@
@Override
public void onTimer(
final String timerId,
+ final String timerFamilyId,
final BoundedWindow window,
final Instant timestamp,
+ final Instant outputTimestamp,
final TimeDomain timeDomain) {
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
- delegate.onTimer(timerId, window, timestamp, timeDomain);
+ delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index e978f46..9cbbeda 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -161,7 +161,13 @@
StateNamespace namespace = timer.getNamespace();
checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
- doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+ doFnRunner.onTimer(
+ timer.getTimerId(),
+ timer.getTimerFamilyId(),
+ window,
+ timer.getTimestamp(),
+ timer.getOutputTimestamp(),
+ timer.getDomain());
}
}
}