Fix Output Windows in OnTimerContext

When a User timer is delivered, output elements produced by that timer
firing should be placed within the same window as the timer is in.

Deliver timers in the window of thier namespace in the DirectRunner.

Test that timer deliveries maintain the window the timer was emitted in.
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 2b93ca0..f5a559c 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Sets;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -766,22 +767,26 @@
 
     @Override
     public void output(OutputT output) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp(), Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public void outputWithTimestamp(OutputT output, Instant timestamp) {
-      context.outputWithTimestamp(output, timestamp);
+      context.outputWindowedValue(
+          output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutput(TupleTag<T> tag, T output) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
     public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
-      context.sideOutputWithTimestamp(tag, output, timestamp);
+      context.sideOutputWindowedValue(
+          tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
     }
 
     @Override
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
index 0ad40ac..77bebb2 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java
@@ -17,11 +17,12 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.auto.value.AutoValue;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import java.util.Collections;
 import java.util.HashMap;
@@ -30,6 +31,7 @@
 import org.apache.beam.runners.core.KeyedWorkItems;
 import org.apache.beam.runners.core.StateNamespace;
 import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateNamespaces.WindowNamespace;
 import org.apache.beam.runners.core.StateTag;
 import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -228,15 +230,20 @@
     @Override
     public void processElement(WindowedValue<KeyedWorkItem<K, KV<K, InputT>>> gbkResult)
         throws Exception {
-
-      BoundedWindow window = Iterables.getOnlyElement(gbkResult.getWindows());
-
       for (WindowedValue<KV<K, InputT>> windowedValue : gbkResult.getValue().elementsIterable()) {
         delegateEvaluator.processElement(windowedValue);
       }
 
       for (TimerData timer : gbkResult.getValue().timersIterable()) {
-        delegateEvaluator.onTimer(timer, window);
+        checkState(
+            timer.getNamespace() instanceof WindowNamespace,
+            "Expected Timer %s to be in a %s, but got %s",
+            timer,
+            WindowNamespace.class.getSimpleName(),
+            timer.getNamespace().getClass().getName());
+        WindowNamespace<?> windowNamespace = (WindowNamespace) timer.getNamespace();
+        BoundedWindow timerWindow = windowNamespace.getWindow();
+        delegateEvaluator.onTimer(timer, timerWindow);
       }
     }
 
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index e58f78e..d5786f1 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -101,6 +101,7 @@
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -1952,6 +1953,50 @@
     pipeline.run();
   }
 
+  @Test
+  @Category({RunnableOnService.class, UsesTimersInParDo.class})
+  public void testTimerReceivedInOriginalWindow() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, BoundedWindow> fn =
+        new DoFn<KV<String, Integer>, BoundedWindow>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.setForNowPlus(Duration.standardSeconds(1));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context, BoundedWindow window) {
+            context.output(context.window());
+          }
+
+          public TypeDescriptor<BoundedWindow> getOutputTypeDescriptor() {
+            return (TypeDescriptor) TypeDescriptor.of(IntervalWindow.class);
+          }
+        };
+
+    SlidingWindows windowing =
+        SlidingWindows.of(Duration.standardMinutes(3)).every(Duration.standardMinutes(1));
+    PCollection<BoundedWindow> output =
+        pipeline
+            .apply(Create.timestamped(TimestampedValue.of(KV.of("hello", 24), new Instant(0L))))
+            .apply(Window.<KV<String, Integer>>into(windowing))
+            .apply(ParDo.of(fn));
+
+    PAssert.that(output)
+        .containsInAnyOrder(
+            new IntervalWindow(new Instant(0), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(1)), Duration.standardMinutes(3)),
+            new IntervalWindow(
+                new Instant(0).minus(Duration.standardMinutes(2)), Duration.standardMinutes(3)));
+    pipeline.run();
+  }
+
   /**
    * Tests that an event time timer set absolutely for the last possible moment fires and results in
    * supplementary output. The test is otherwise identical to {@link #testEventTimeTimerBounded()}.