onTimer/setTimer signature updates
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 79bb6ef..4841c6a 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -384,7 +384,12 @@
       checkArgument(namespace instanceof WindowNamespace);
       BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
       pushbackDoFnRunner.onTimer(
-          timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+          timerData.getTimerId(),
+          timerData.getTimerFamilyId(),
+          window,
+          timerData.getTimestamp(),
+          timerData.getOutputTimestamp(),
+          timerData.getDomain());
     }
     pushbackDoFnRunner.finishBundle();
   }
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
index cf28436..e2fc262 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java
@@ -38,7 +38,13 @@
    * Calls a {@link DoFn DoFn's} {@link DoFn.OnTimer @OnTimer} method for the given timer in the
    * given window.
    */
-  void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+  void onTimer(
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain);
 
   /**
    * Calls a {@link DoFn DoFn's} {@link DoFn.FinishBundle @FinishBundle} method and performs
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
index 4865e82..8f19b5f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java
@@ -82,8 +82,13 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
   }
 
   @Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
index d65b5f4..c310c49 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java
@@ -83,7 +83,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException("User timers unsupported in ProcessFn");
   }
 
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index cc2e86a..32a61af 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -43,7 +43,13 @@
   Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem);
 
   /** Calls the underlying {@link DoFn.OnTimer} method. */
-  void onTimer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain);
+  void onTimer(
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain);
 
   /** Calls the underlying {@link DoFn.FinishBundle} method. */
   void finishBundle();
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 472a9d2..68f780d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -186,7 +186,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
 
     // The effective timestamp is when derived elements will have their timestamp set, if not
     // otherwise specified. If this is an event time timer, then they have the timestamp of the
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
index 36a89fe..b27e046 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunner.java
@@ -108,8 +108,13 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    underlying.onTimer(timerId, window, timestamp, timeDomain);
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    underlying.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
   }
 
   @Override
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index f69c74a..670f3a5 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -117,7 +117,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
@@ -134,7 +139,7 @@
             window,
             cleanupTimer.currentInputWatermarkTime());
       } else {
-        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+        doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
       }
     }
   }
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 10972d6..90bb5aa 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -119,7 +119,12 @@
     thrown.expectCause(is(fn.exceptionToThrow));
 
     runner.onTimer(
-        ThrowingDoFn.TIMER_ID, GlobalWindow.INSTANCE, new Instant(0), TimeDomain.EVENT_TIME);
+        ThrowingDoFn.TIMER_ID,
+        ThrowingDoFn.TIMER_ID,
+        GlobalWindow.INSTANCE,
+        new Instant(0),
+        new Instant(0),
+        TimeDomain.EVENT_TIME);
   }
 
   /**
@@ -239,8 +244,10 @@
     // the method call.
     runner.onTimer(
         DoFnWithTimers.TIMER_ID,
+        DoFnWithTimers.TIMER_ID,
         GlobalWindow.INSTANCE,
         currentTime.plus(offset),
+        currentTime.plus(offset),
         TimeDomain.EVENT_TIME);
 
     assertThat(
@@ -248,8 +255,10 @@
         contains(
             TimerData.of(
                 DoFnWithTimers.TIMER_ID,
+                DoFnWithTimers.TIMER_ID,
                 StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
                 currentTime.plus(offset),
+                currentTime.plus(offset),
                 TimeDomain.EVENT_TIME)));
   }
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index 28b387e..10ac7a8 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -283,7 +283,13 @@
 
     // Mocking is not easily compatible with annotation analysis, so we manually record
     // the method call.
-    runner.onTimer(timerId, window, new Instant(timestamp), TimeDomain.EVENT_TIME);
+    runner.onTimer(
+        timerId,
+        timerId,
+        window,
+        new Instant(timestamp),
+        new Instant(timestamp),
+        TimeDomain.EVENT_TIME);
 
     assertThat(
         underlying.firedTimers,
@@ -320,12 +326,19 @@
 
     @Override
     public void onTimer(
-        String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+        String timerId,
+        String timerFamilyId,
+        BoundedWindow window,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
       firedTimers.add(
           TimerData.of(
               timerId,
+              timerFamilyId,
               StateNamespaces.window(IntervalWindow.getCoder(), (IntervalWindow) window),
               timestamp,
+              outputTimestamp,
               timeDomain));
     }
 
@@ -458,7 +471,13 @@
       StateNamespace namespace = timer.getNamespace();
       checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
       BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-      toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      toTrigger.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     }
   }
 
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
index 85b3c0b..be4e321 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -220,7 +220,13 @@
       StateNamespace namespace = timer.getNamespace();
       checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
       BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-      toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      toTrigger.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     }
   }
 
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
index 31eb80b..5f41175 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java
@@ -222,7 +222,13 @@
 
   public void onTimer(TimerData timer, BoundedWindow window) {
     try {
-      fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      fnRunner.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     } catch (Exception e) {
       throw UserCodeException.wrap(e);
     }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
index 9d853a2..ce54d5b 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java
@@ -68,12 +68,14 @@
   @Override
   public void onTimer(
       final String timerId,
+      final String timerFamilyId,
       final BoundedWindow window,
       final Instant timestamp,
+      final Instant outputTimestamp,
       final TimeDomain timeDomain) {
     try (Closeable ignored =
         MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer(stepName))) {
-      delegate.onTimer(timerId, window, timestamp, timeDomain);
+      delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 6aee09f..08e0a90 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -215,7 +215,13 @@
     StateNamespace namespace = timer.getNamespace();
     checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
     BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-    doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+    doFnRunner.onTimer(
+        timer.getTimerId(),
+        timer.getTimerFamilyId(),
+        window,
+        timer.getTimestamp(),
+        timer.getOutputTimestamp(),
+        timer.getDomain());
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8e63679..5956ef6 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -813,7 +813,12 @@
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
     timerInternals.cleanupPendingTimer(timer.getNamespace());
     pushbackDoFnRunner.onTimer(
-        timerData.getTimerId(), window, timerData.getTimestamp(), timerData.getDomain());
+        timerData.getTimerId(),
+        timerData.getTimerFamilyId(),
+        window,
+        timerData.getTimestamp(),
+        timerData.getOutputTimestamp(),
+        timerData.getDomain());
   }
 
   private void setCurrentInputWatermark(long currentInputWatermark) {
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
index 891cef8..45fc2a1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/ExecutableStageDoFnOperator.java
@@ -661,7 +661,12 @@
 
     @Override
     public void onTimer(
-        String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+        String timerId,
+        String timerFamilyId,
+        BoundedWindow window,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
       Object timerKey = keyForTimer.get();
       Preconditions.checkNotNull(timerKey, "Key for timer needs to be set before calling onTimer");
       Preconditions.checkNotNull(remoteBundle, "Call to onTimer outside of a bundle");
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
index b0f9304..5c5ca6a 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElements.java
@@ -67,20 +67,30 @@
   static final class Timer implements BufferedElement {
 
     private final String timerId;
+    private final String timerFamilyId;
     private final BoundedWindow window;
     private final Instant timestamp;
+    private final Instant outputTimestamp;
     private final TimeDomain timeDomain;
 
-    Timer(String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+    Timer(
+        String timerId,
+        String timerFamilyId,
+        BoundedWindow window,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {
       this.timerId = timerId;
       this.window = window;
       this.timestamp = timestamp;
       this.timeDomain = timeDomain;
+      this.outputTimestamp = outputTimestamp;
+      this.timerFamilyId = timerFamilyId;
     }
 
     @Override
     public void processWith(DoFnRunner doFnRunner) {
-      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      doFnRunner.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
     }
 
     @Override
@@ -130,8 +140,10 @@
         outStream.write(TIMER_MAGIC_BYTE);
         Timer timer = (Timer) value;
         STRING_CODER.encode(timer.timerId, outStream);
+        STRING_CODER.encode(timer.timerFamilyId, outStream);
         windowCoder.encode(timer.window, outStream);
         INSTANT_CODER.encode(timer.timestamp, outStream);
+        INSTANT_CODER.encode(timer.outputTimestamp, outStream);
         outStream.write(timer.timeDomain.ordinal());
       } else {
         throw new IllegalStateException("Unexpected element " + value);
@@ -147,8 +159,10 @@
         case TIMER_MAGIC_BYTE:
           return new Timer(
               STRING_CODER.decode(inStream),
+              STRING_CODER.decode(inStream),
               windowCoder.decode(inStream),
               INSTANT_CODER.decode(inStream),
+              INSTANT_CODER.decode(inStream),
               TimeDomain.values()[inStream.read()]);
         default:
           throw new IllegalStateException(
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
index 80aabc2..367ed32 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java
@@ -117,9 +117,15 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     currentBufferingElementsHandler.buffer(
-        new BufferedElements.Timer(timerId, window, timestamp, timeDomain));
+        new BufferedElements.Timer(
+            timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain));
   }
 
   @Override
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
index 9ebdefc..0828a22 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferedElementsTest.java
@@ -52,7 +52,12 @@
             WindowedValue.of("test", new Instant(2), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
     BufferedElement timerElement =
         new BufferedElements.Timer(
-            "timerId", GlobalWindow.INSTANCE, new Instant(1), TimeDomain.EVENT_TIME);
+            "timerId",
+            "timerId",
+            GlobalWindow.INSTANCE,
+            new Instant(1),
+            new Instant(1),
+            TimeDomain.EVENT_TIME);
 
     testRoundTrip(ImmutableList.of(element), coder);
     testRoundTrip(ImmutableList.of(timerElement), coder);
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
index 2903cc0..3cb92c9 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/DataflowProcessFnRunner.java
@@ -109,7 +109,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException("Unsupported for ProcessFn");
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
index e50f1bd..6296461 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/GroupAlsoByWindowFnRunner.java
@@ -82,7 +82,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException(
         String.format("Timers are not supported by %s", GroupAlsoByWindowFn.class.getSimpleName()));
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
index 712a017..4c011ed 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java
@@ -357,7 +357,13 @@
   private void processUserTimer(TimerData timer) throws Exception {
     if (fnSignature.timerDeclarations().containsKey(timer.getTimerId())) {
       BoundedWindow window = ((WindowNamespace) timer.getNamespace()).getWindow();
-      fnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      fnRunner.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     }
   }
 
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
index bf5efee..41da4b3 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingKeyedWorkItemSideInputDoFnRunner.java
@@ -133,7 +133,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException(
         "Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.");
   }
diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
index 02a1292..f7e86c7 100644
--- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
+++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputDoFnRunner.java
@@ -77,7 +77,12 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     throw new UnsupportedOperationException(
         "Attempt to deliver a timer to a DoFn, but timers are not supported in Dataflow.");
   }
diff --git a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
index e291117..76e8375 100644
--- a/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
+++ b/runners/jet/src/main/java/org/apache/beam/runners/jet/processors/StatefulParDoP.java
@@ -92,7 +92,13 @@
       TimerInternals.TimerData timer, DoFnRunner<KV<?, ?>, ?> doFnRunner) {
     StateNamespace namespace = timer.getNamespace();
     BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-    doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+    doFnRunner.onTimer(
+        timer.getTimerId(),
+        timer.getTimerFamilyId(),
+        window,
+        timer.getTimestamp(),
+        timer.getOutputTimestamp(),
+        timer.getDomain());
   }
 
   @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
index 101ee80..aefcf6d 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/metrics/DoFnRunnerWithMetrics.java
@@ -57,8 +57,16 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    withMetrics(() -> underlying.onTimer(timerId, window, timestamp, timeDomain));
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
+    withMetrics(
+        () ->
+            underlying.onTimer(
+                timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain));
   }
 
   @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
index 795b8c0..cd140c7 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnOp.java
@@ -452,7 +452,13 @@
       // Need to pass in the keyed TimerData here
       ((DoFnRunnerWithKeyedInternals) fnRunner).onTimer(keyedTimerData, window);
     } else {
-      pushbackFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      pushbackFnRunner.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     }
   }
 
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
index 6fb2bd3..3b2d1cb 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/DoFnRunnerWithKeyedInternals.java
@@ -62,7 +62,13 @@
 
     try {
       final TimerInternals.TimerData timer = keyedTimerData.getTimerData();
-      onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     } finally {
       clearKeyedInternals();
     }
@@ -70,10 +76,15 @@
 
   @Override
   public void onTimer(
-      String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
+      String timerId,
+      String timerFamilyId,
+      BoundedWindow window,
+      Instant timestamp,
+      Instant outputTimestamp,
+      TimeDomain timeDomain) {
     checkState(keyedInternals.getKey() != null, "Key is not set for timer");
 
-    underlying.onTimer(timerId, window, timestamp, timeDomain);
+    underlying.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
   }
 
   @Override
diff --git a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
index 3b1b938..99439a2 100644
--- a/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
+++ b/runners/samza/src/main/java/org/apache/beam/runners/samza/runtime/SamzaDoFnRunners.java
@@ -262,7 +262,12 @@
 
     @Override
     public void onTimer(
-        String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {}
+        String timerId,
+        String timerFamilyId,
+        BoundedWindow window,
+        Instant timestamp,
+        Instant outputTimestamp,
+        TimeDomain timeDomain) {}
 
     @Override
     public void finishBundle() {
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
index 46dc282..55d97ba 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/DoFnRunnerWithMetrics.java
@@ -71,11 +71,13 @@
   @Override
   public void onTimer(
       final String timerId,
+      final String timerFamilyId,
       final BoundedWindow window,
       final Instant timestamp,
+      final Instant outputTimestamp,
       final TimeDomain timeDomain) {
     try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
-      delegate.onTimer(timerId, window, timestamp, timeDomain);
+      delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
index 845dc63..013f860 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnRunnerWithMetrics.java
@@ -71,11 +71,13 @@
   @Override
   public void onTimer(
       final String timerId,
+      final String timerFamilyId,
       final BoundedWindow window,
       final Instant timestamp,
+      final Instant outputTimestamp,
       final TimeDomain timeDomain) {
     try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(metricsContainer())) {
-      delegate.onTimer(timerId, window, timestamp, timeDomain);
+      delegate.onTimer(timerId, timerFamilyId, window, timestamp, outputTimestamp, timeDomain);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
index e978f46..9cbbeda 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkProcessContext.java
@@ -161,7 +161,13 @@
       StateNamespace namespace = timer.getNamespace();
       checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
       BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
-      doFnRunner.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+      doFnRunner.onTimer(
+          timer.getTimerId(),
+          timer.getTimerFamilyId(),
+          window,
+          timer.getTimestamp(),
+          timer.getOutputTimestamp(),
+          timer.getDomain());
     }
   }
 }