Move GroupByKey validation into validate method
----Release Notes----
Re-enabled verification of GroupByKey usage. Specififically, the key
must have a deterministic coder and using GroupByKey with an Unbounded
PCollection requires windowing or triggers.
[]
-------------
Created by MOE: http://code.google.com/p/moe-java
MOE_MIGRATED_REVID=99194010
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index b9d24c2..65065f1 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -73,6 +73,7 @@
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.dataflow.sdk.values.POutput;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
@@ -188,7 +189,7 @@
return new DataflowPipelineRunner(dataflowOptions);
}
- private DataflowPipelineRunner(DataflowPipelineOptions options) {
+ @VisibleForTesting protected DataflowPipelineRunner(DataflowPipelineOptions options) {
this.options = options;
this.dataflowClient = options.getDataflowClient();
this.translator = DataflowPipelineTranslator.fromOptions(options);
@@ -224,7 +225,9 @@
@SuppressWarnings("unchecked")
OutputT outputT = (OutputT) PCollection.createPrimitiveOutputInternal(
pc.getPipeline(),
- pc.getWindowingStrategy(),
+ transform instanceof GroupByKey
+ ? ((GroupByKey<?, ?>) transform).updateWindowingStrategy(pc.getWindowingStrategy())
+ : pc.getWindowingStrategy(),
pc.isBounded());
return outputT;
@@ -662,7 +665,7 @@
.setWindowingStrategyInternal(WindowingStrategy.globalDefault())
.apply(Window.<KV<Void, Iterable<Void>>>into(new GlobalWindows()))
.apply(ParDo.of(new OutputElements<>(transform.getElements(), coder)))
- .setCoder(coder);
+ .setCoder(coder).setIsBoundedInternal(IsBounded.BOUNDED);
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException("Unable to infer a coder and no Coder was specified. "
+ "Please set a coder by invoking Create.withCoder() explicitly.", e);
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
index df55c74..b760896 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/GroupByKey.java
@@ -176,18 +176,60 @@
/////////////////////////////////////////////////////////////////////////////
@Override
- public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- // This operation groups by the combination of key and window,
- // merging windows as needed, using the windows assigned to the
- // key/value input elements and the window merge operation of the
- // window function associated with the input PCollection.
+ public void validate(PCollection<KV<K, V>> input) {
WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+ // Verify that the input PCollection is bounded, or that there is windowing/triggering being
+ // used. Without this, the watermark (at end of global window) will never be reached.
+ if (windowingStrategy.getWindowFn() instanceof GlobalWindows
+ && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
+ && input.isBounded() != IsBounded.BOUNDED) {
+ throw new IllegalStateException("GroupByKey cannot be applied to non-bounded PCollection in "
+ + "the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform "
+ + "prior to GroupByKey.");
+ }
+
+ // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
+ // the key coder is deterministic.
+ Coder<K> keyCoder = getKeyCoder(input.getCoder());
+ try {
+ keyCoder.verifyDeterministic();
+ } catch (NonDeterministicException e) {
+ throw new IllegalStateException(
+ "the keyCoder of a GroupByKey must be deterministic", e);
+ }
+
+ // Validate the window merge function.
if (windowingStrategy.getWindowFn() instanceof InvalidWindows) {
String cause = ((InvalidWindows<?>) windowingStrategy.getWindowFn()).getCause();
throw new IllegalStateException(
"GroupByKey must have a valid Window merge function. "
+ "Invalid because: " + cause);
}
+ }
+
+ public WindowingStrategy<?, ?> updateWindowingStrategy(WindowingStrategy<?, ?> inputStrategy) {
+ WindowFn<?, ?> inputWindowFn = inputStrategy.getWindowFn();
+ if (!inputWindowFn.isNonMerging()) {
+ // Prevent merging windows again, without explicit user
+ // involvement, e.g., by Window.into() or Window.remerge().
+ inputWindowFn = new InvalidWindows<>(
+ "WindowFn has already been consumed by previous GroupByKey", inputWindowFn);
+ }
+
+ // We also switch to the continuation trigger associated with the current trigger.
+ return inputStrategy
+ .withWindowFn(inputWindowFn)
+ .withTrigger(inputStrategy.getTrigger().getSpec().getContinuationTrigger());
+ }
+
+ @Override
+ public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
+ // This operation groups by the combination of key and window,
+ // merging windows as needed, using the windows assigned to the
+ // key/value input elements and the window merge operation of the
+ // window function associated with the input PCollection.
+ WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
+
// By default, implement GroupByKey[AndWindow] via a series of lower-level
// operations.
return input
@@ -206,7 +248,10 @@
.apply(new SortValuesByTimestamp<K, V>())
// Group each key's values by window, merging windows as needed.
- .apply(new GroupAlsoByWindow<K, V>(windowingStrategy));
+ .apply(new GroupAlsoByWindow<K, V>(windowingStrategy))
+
+ // And update the windowing strategy as appropriate.
+ .setWindowingStrategyInternal(updateWindowingStrategy(windowingStrategy));
}
@Override
@@ -383,46 +428,12 @@
public static class GroupByKeyOnly<K, V>
extends PTransform<PCollection<KV<K, V>>,
PCollection<KV<K, Iterable<V>>>> {
- @Override
- public void validate(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> windowingStrategy = input.getWindowingStrategy();
- if (windowingStrategy.getWindowFn() instanceof GlobalWindows
- && windowingStrategy.getTrigger().getSpec() instanceof DefaultTrigger
- && input.isBounded() != IsBounded.BOUNDED) {
- throw new IllegalStateException("Non-bounded PCollection cannot be "
- + "processed with GlobalWindow and DefaultTrigger for GroupByKey."
- + "Use Window.into transform prior to GroupByKey.");
- }
- // Verify that the input Coder<KV<K, V>> is a KvCoder<K, V>, and that
- // the key coder is deterministic.
- Coder<K> keyCoder = getKeyCoder(input.getCoder());
- try {
- keyCoder.verifyDeterministic();
- } catch (NonDeterministicException e) {
- throw new IllegalStateException(
- "the keyCoder of a GroupByKey must be deterministic", e);
- }
- }
@SuppressWarnings({"rawtypes", "unchecked"})
@Override
public PCollection<KV<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) {
- WindowingStrategy<?, ?> oldWindowingStrategy = input.getWindowingStrategy();
- WindowFn<?, ?> newWindowFn = oldWindowingStrategy.getWindowFn();
- if (!newWindowFn.isNonMerging()) {
- // Prevent merging windows again, without explicit user
- // involvement, e.g., by Window.into() or Window.remerge().
- newWindowFn = new InvalidWindows(
- "WindowFn has already been consumed by previous GroupByKey", newWindowFn);
- }
-
- // We also switch to the continuation trigger associated with the current trigger.
- WindowingStrategy<?, ?> newWindowingStrategy = oldWindowingStrategy
- .withWindowFn(newWindowFn)
- .withTrigger(oldWindowingStrategy.getTrigger().getSpec().getContinuationTrigger());
-
return PCollection.<KV<K, Iterable<V>>>createPrimitiveOutputInternal(
- input.getPipeline(), newWindowingStrategy, input.isBounded());
+ input.getPipeline(), input.getWindowingStrategy(), input.isBounded());
}
/**
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
index 07a60da..3c1c1e4 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/values/PCollection.java
@@ -181,7 +181,6 @@
return isBounded;
}
-
/////////////////////////////////////////////////////////////////////////////
// Internal details below here.
@@ -223,8 +222,10 @@
/**
* Sets the {@link PCollection.IsBounded} of this {@code PCollection}.
+ *
+ * <p> For use by internal transformations only.
*/
- private PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
+ public PCollection<T> setIsBoundedInternal(IsBounded isBounded) {
this.isBounded = isBounded;
return this;
}
diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
index 6033033..18c1e42 100644
--- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
+++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/transforms/GroupByKeyTest.java
@@ -26,6 +26,11 @@
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.coders.MapCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
+import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.DirectPipelineOptions;
+import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
+import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
+import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.testing.DataflowAssert;
import com.google.cloud.dataflow.sdk.testing.RunnableOnService;
import com.google.cloud.dataflow.sdk.testing.TestPipeline;
@@ -33,7 +38,10 @@
import com.google.cloud.dataflow.sdk.transforms.windowing.InvalidWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Sessions;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.util.NoopPathValidator;
+import com.google.cloud.dataflow.sdk.util.WindowingStrategy;
import com.google.cloud.dataflow.sdk.values.KV;
+import com.google.cloud.dataflow.sdk.values.PBegin;
import com.google.cloud.dataflow.sdk.values.PCollection;
import org.joda.time.Duration;
@@ -57,7 +65,7 @@
public class GroupByKeyTest {
@Rule
- public ExpectedException expectedEx = ExpectedException.none();
+ public ExpectedException thrown = ExpectedException.none();
@Test
@Category(RunnableOnService.class)
@@ -167,8 +175,6 @@
@Test
public void testGroupByKeyNonDeterministic() throws Exception {
- expectedEx.expect(IllegalStateException.class);
- expectedEx.expectMessage("must be deterministic");
List<KV<Map<String, String>, Integer>> ungroupedPairs = Arrays.asList();
@@ -180,9 +186,9 @@
KvCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()),
BigEndianIntegerCoder.of())));
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("must be deterministic");
input.apply(GroupByKey.<Map<String, String>, Integer>create());
-
- p.run();
}
@Test
@@ -230,9 +236,30 @@
Duration.standardMinutes(1)))));
}
+ /**
+ * Create a test pipeline that uses the {@link DataflowPipelineRunner} so that {@link GroupByKey}
+ * is not expanded. This is used for verifying that even without expansion the proper errors show
+ * up.
+ */
+ private Pipeline createTestServiceRunner() {
+ DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
+ options.setRunner(DataflowPipelineRunner.class);
+ options.setProject("someproject");
+ options.setStagingLocation("gs://staging");
+ options.setPathValidatorClass(NoopPathValidator.class);
+ options.setDataflowClient(null);
+ return Pipeline.create(options);
+ }
+
+ private Pipeline createTestDirectRunner() {
+ DirectPipelineOptions options = PipelineOptionsFactory.as(DirectPipelineOptions.class);
+ options.setRunner(DirectPipelineRunner.class);
+ return Pipeline.create(options);
+ }
+
@Test
- public void testInvalidWindows() {
- Pipeline p = TestPipeline.create();
+ public void testInvalidWindowsDirect() {
+ Pipeline p = createTestDirectRunner();
List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
@@ -242,15 +269,30 @@
.apply(Window.<KV<String, Integer>>into(
Sessions.withGapDuration(Duration.standardMinutes(1))));
- try {
- input
- .apply("GroupByKey", GroupByKey.<String, Integer>create())
- .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
- Assert.fail("Exception should have been thrown");
- } catch (IllegalStateException e) {
- Assert.assertTrue(e.getMessage().startsWith(
- "GroupByKey must have a valid Window merge function."));
- }
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("GroupByKey must have a valid Window merge function");
+ input
+ .apply("GroupByKey", GroupByKey.<String, Integer>create())
+ .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
+ }
+
+ @Test
+ public void testInvalidWindowsService() {
+ Pipeline p = createTestServiceRunner();
+
+ List<KV<String, Integer>> ungroupedPairs = Arrays.asList();
+
+ PCollection<KV<String, Integer>> input =
+ p.apply(Create.of(ungroupedPairs)
+ .withCoder(KvCoder.of(StringUtf8Coder.of(), BigEndianIntegerCoder.of())))
+ .apply(Window.<KV<String, Integer>>into(
+ Sessions.withGapDuration(Duration.standardMinutes(1))));
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage("GroupByKey must have a valid Window merge function");
+ input
+ .apply("GroupByKey", GroupByKey.<String, Integer>create())
+ .apply("GroupByKeyAgain", GroupByKey.<String, Iterable<Integer>>create());
}
@Test
@@ -279,6 +321,48 @@
}
@Test
+ public void testGroupByKeyDirectUnbounded() {
+ Pipeline p = createTestDirectRunner();
+
+ PCollection<KV<String, Integer>> input = p
+ .apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+ }
+ });
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+ input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+ }
+
+ @Test
+ public void testGroupByKeyServiceUnbounded() {
+ Pipeline p = createTestServiceRunner();
+
+ PCollection<KV<String, Integer>> input = p
+ .apply(new PTransform<PBegin, PCollection<KV<String, Integer>>>() {
+ @Override
+ public PCollection<KV<String, Integer>> apply(PBegin input) {
+ return PCollection.createPrimitiveOutputInternal(input.getPipeline(),
+ WindowingStrategy.globalDefault(), PCollection.IsBounded.UNBOUNDED);
+ }
+ });
+
+ thrown.expect(IllegalStateException.class);
+ thrown.expectMessage(
+ "GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without "
+ + "a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.");
+
+ input.apply("GroupByKey", GroupByKey.<String, Integer>create());
+ }
+
+ @Test
public void testGroupByKeyGetName() {
Assert.assertEquals("GroupByKey", GroupByKey.<String, Integer>create().getName());
}