Move GroupByKey validation into validate method

----Release Notes----
Re-enabled verification of GroupByKey usage. Specififically, the key
must have a deterministic coder and using GroupByKey with an Unbounded
PCollection requires windowing or triggers.

[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=99194010
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index b9d24c2..65065f1 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -73,6 +73,7 @@
 import com.google.cloud.dataflow.sdk.values.PDone;
 import com.google.cloud.dataflow.sdk.values.PInput;
 import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
@@ -188,7 +189,7 @@
     return new DataflowPipelineRunner(dataflowOptions);
   }
 
-  private DataflowPipelineRunner(DataflowPipelineOptions options) {
+  @VisibleForTesting protected DataflowPipelineRunner(DataflowPipelineOptions options) {
     this.options = options;
     this.dataflowClient = options.getDataflowClient();
     this.translator = DataflowPipelineTranslator.fromOptions(options);
@@ -224,7 +225,9 @@
       @SuppressWarnings("unchecked")
       OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
           pc.getPipeline(),
-          pc.getWindowingStrategy(),
+          transform instanceof GroupByKey
+              ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy())
+              : pc.getWindowingStrategy(),
           pc.isBounded());
       return outputT;
 
@@ -662,7 +665,7 @@
             .setWindowingStrategyInternal(WindowingStrategy.globalDefault())
             .apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows()))
             .apply(ParDo.of(new OutputElements<>(transform.getElements(), coder)))
-            .setCoder(coder);
+            .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED);
       } catch (CannotProvideCoderException e) {
         throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
             + "Please set a coder by invoking Create.withCoder() explicitly.", e);
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index df55c74..b760896 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -176,18 +176,60 @@
   /////////////////////////////////////////////////////////////////////////////
 
   @Override
-  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-    // This operation groups by the combination of key and window,
-    // merging windows as needed, using the windows assigned to the
-    // key/value input elements and the window merge operation of the
-    // window function associated with the input PCollection.
+  public void validate(PCollection<KV<K, V>> input) {
     WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+    // Verify that the input PCollection is bounded, or that there is windowing/triggering being
+    // used. Without this, the watermark (at end of global window) will never be reached.
+    if (windowingStrategy.getWindowFn() instanceof GlobalWindows
+        && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
+        && input.isBounded() != IsBounded.BOUNDED) {
+      throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
+          + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
+          + "prior to GroupByKey.");
+    }
+
+    // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
+    // the key coder is deterministic.
+    Coder<K> keyCoder = getKeyCoder(input.getCoder());
+    try {
+      keyCoder.verifyDeterministic();
+    } catch (NonDeterministicException e) {
+      throw new IllegalStateException(
+          "the keyCoder of a GroupByKey must be deterministic", e);
+    }
+
+    // Validate the window merge function.
     if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
       String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
       throw new IllegalStateException(
           "GroupByKey must have a valid Window merge function.  "
           + "Invalid because: " + cause);
     }
+  }
+
+  public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
+    WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
+    if (!inputWindowFn.isNonMerging()) {
+      // Prevent merging windows again, without explicit user
+      // involvement, e.g., by Window.into() or Window.remerge().
+      inputWindowFn = new InvalidWindows<>(
+          "WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
+    }
+
+    // We also switch to the continuation trigger associated with the current trigger.
+    return inputStrategy
+        .withWindowFn(inputWindowFn)
+        .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
+  }
+
+  @Override
+  public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+    // This operation groups by the combination of key and window,
+    // merging windows as needed, using the windows assigned to the
+    // key/value input elements and the window merge operation of the
+    // window function associated with the input PCollection.
+    WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
     // By default, implement GroupByKey[AndWindow] via a series of lower-level
     // operations.
     return input
@@ -206,7 +248,10 @@
         .apply(new SortValuesByTimestamp<K, V>())
 
         // Group each key's values by window, merging windows as needed.
-        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy));
+        .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+        // And update the windowing strategy as appropriate.
+        .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
   }
 
   @Override
@@ -383,46 +428,12 @@
   public static class GroupByKeyOnly<K, V>
       extends PTransform<PCollection<KV<K, V>>,
                          PCollection<KV<K, Iterable<V>>>> {
-    @Override
-    public void validate(PCollection<KV<K, V>> input) {
-      WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
-      if (windowingStrategy.getWindowFn() instanceof GlobalWindows
-          && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
-          && input.isBounded() != IsBounded.BOUNDED) {
-        throw new IllegalStateException("Non-bounded PCollection cannot be "
-            + "processed with GlobalWindow and DefaultTrigger for GroupByKey."
-            + "Use Window.into transform prior to GroupByKey.");
-      }
-      // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
-      // the key coder is deterministic.
-      Coder<K> keyCoder = getKeyCoder(input.getCoder());
-      try {
-        keyCoder.verifyDeterministic();
-      } catch (NonDeterministicException e) {
-        throw new IllegalStateException(
-            "the keyCoder of a GroupByKey must be deterministic", e);
-      }
-    }
 
     @SuppressWarnings({"rawtypes", "unchecked"})
     @Override
     public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
-      WindowingStrategy<?, ?> oldWindowingStrategy = input.getWindowingStrategy();
-      WindowFn<?, ?> newWindowFn = oldWindowingStrategy.getWindowFn();
-      if (!newWindowFn.isNonMerging()) {
-        // Prevent merging windows again, without explicit user
-        // involvement, e.g., by Window.into() or Window.remerge().
-        newWindowFn = new InvalidWindows(
-            "WindowFn has already been consumed by previous GroupByKey", newWindowFn);
-      }
-
-      // We also switch to the continuation trigger associated with the current trigger.
-      WindowingStrategy<?, ?> newWindowingStrategy = oldWindowingStrategy
-          .withWindowFn(newWindowFn)
-          .withTrigger(oldWindowingStrategy.getTrigger().getSpec().getContinuationTrigger());
-
       return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
-          input.getPipeline(), newWindowingStrategy, input.isBounded());
+          input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
     }
 
     /**
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
index 07a60da..3c1c1e4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
@@ -181,7 +181,6 @@
     return isBounded;
   }
 
-
   /////////////////////////////////////////////////////////////////////////////
   // Internal details below here.
 
@@ -223,8 +222,10 @@
 
   /**
    * Sets the {@link PCollection.IsBounded} of this {@code PCollection}.
+   *
+   * <p> For use by internal transformations only.
    */
-  private PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
+  public PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
     this.isBounded = isBounded;
     return this;
   }
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
index 6033033..18c1e42 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
@@ -26,6 +26,11 @@
 import com.google.cloud.dataflow.sdk.coders.KvCoder;
 import com.google.cloud.dataflow.sdk.coders.MapCoder;
 import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
 import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
 import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
 import com.google.cloud.dataflow.sdk.testing.TestPipeline;
@@ -33,7 +38,10 @@
 import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
 import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
 import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
 import com.google.cloud.dataflow.sdk.values.PCollection;
 
 import org.joda.time.Duration;
@@ -57,7 +65,7 @@
 public class GroupByKeyTest {
 
   @Rule
-  public ExpectedException expectedEx = ExpectedException.none();
+  public ExpectedException thrown = ExpectedException.none();
 
   @Test
   @Category(RunnableOnService.class)
@@ -167,8 +175,6 @@
 
   @Test
   public void testGroupByKeyNonDeterministic() throws Exception {
-    expectedEx.expect(IllegalStateException.class);
-    expectedEx.expectMessage("must be deterministic");
 
     List<KV<Map<String, String>, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -180,9 +186,9 @@
                 KvCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
                     BigEndianIntegerCoder.of())));
 
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("must be deterministic");
     input.apply(GroupByKey.<Map<String, String>, Integer>create());
-
-    p.run();
   }
 
   @Test
@@ -230,9 +236,30 @@
                     Duration.standardMinutes(1)))));
   }
 
+  /**
+   * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+   * is not expanded. This is used for verifying that even without expansion the proper errors show
+   * up.
+   */
+  private Pipeline createTestServiceRunner() {
+    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+    options.setRunner(DataflowPipelineRunner.class);
+    options.setProject("someproject");
+    options.setStagingLocation("gs://staging");
+    options.setPathValidatorClass(NoopPathValidator.class);
+    options.setDataflowClient(null);
+    return Pipeline.create(options);
+  }
+
+  private Pipeline createTestDirectRunner() {
+    DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
+    options.setRunner(DirectPipelineRunner.class);
+    return Pipeline.create(options);
+  }
+
   @Test
-  public void testInvalidWindows() {
-    Pipeline p = TestPipeline.create();
+  public void testInvalidWindowsDirect() {
+    Pipeline p = createTestDirectRunner();
 
     List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
 
@@ -242,15 +269,30 @@
         .apply(Window.<KV<String, Integer>>into(
             Sessions.withGapDuration(Duration.standardMinutes(1))));
 
-    try {
-      input
-          .apply("GroupByKey", GroupByKey.<String, Integer>create())
-          .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
-      Assert.fail("Exception should have been thrown");
-    } catch (IllegalStateException e) {
-      Assert.assertTrue(e.getMessage().startsWith(
-          "GroupByKey must have a valid Window merge function."));
-    }
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("GroupByKey must have a valid Window merge function");
+    input
+        .apply("GroupByKey", GroupByKey.<String, Integer>create())
+        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+  }
+
+  @Test
+  public void testInvalidWindowsService() {
+    Pipeline p = createTestServiceRunner();
+
+    List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+    PCollection<KV<String, Integer>> input =
+        p.apply(Create.of(ungroupedPairs)
+            .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+        .apply(Window.<KV<String, Integer>>into(
+            Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage("GroupByKey must have a valid Window merge function");
+    input
+        .apply("GroupByKey", GroupByKey.<String, Integer>create())
+        .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
   }
 
   @Test
@@ -279,6 +321,48 @@
   }
 
   @Test
+  public void testGroupByKeyDirectUnbounded() {
+    Pipeline p = createTestDirectRunner();
+
+    PCollection<KV<String, Integer>> input = p
+        .apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+          @Override
+          public PCollection<KV<String, Integer>> apply(PBegin input) {
+            return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+                WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+          }
+        });
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+  }
+
+  @Test
+  public void testGroupByKeyServiceUnbounded() {
+    Pipeline p = createTestServiceRunner();
+
+    PCollection<KV<String, Integer>> input = p
+        .apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+          @Override
+          public PCollection<KV<String, Integer>> apply(PBegin input) {
+            return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+                WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+          }
+        });
+
+    thrown.expect(IllegalStateException.class);
+    thrown.expectMessage(
+        "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+        + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+    input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+  }
+
+  @Test
   public void testGroupByKeyGetName() {
     Assert.assertEquals("GroupByKey", GroupByKey.<String, Integer>create().getName());
   }