[BEAM-12343] GroupByKeyTest for changing WindowFn from GlobalWindow after GBK
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
index 88d40ac..fd849a5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/GroupByKeyTest.java
@@ -70,8 +70,10 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
+import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Streams;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matcher;
import org.joda.time.Duration;
@@ -585,6 +587,44 @@
}
@Test
+ @Category(ValidatesRunner.class)
+ public void testRewindowWithTimestampCombiner() {
+ PCollection<KV<String, Integer>> input =
+ p.apply(
+ Create.timestamped(
+ TimestampedValue.of(KV.of("foo", 1), new Instant(1)),
+ TimestampedValue.of(KV.of("foo", 4), new Instant(4)),
+ TimestampedValue.of(KV.of("bar", 3), new Instant(3)),
+ TimestampedValue.of(KV.of("foo", 9), new Instant(9))))
+ .apply(
+ "GlobalWindows",
+ Window.<KV<String, Integer>>configure()
+ .withTimestampCombiner(TimestampCombiner.LATEST));
+
+ PCollection<KV<String, Integer>> result =
+ input
+ .apply(GroupByKey.create())
+ .apply(
+ MapElements.into(
+ TypeDescriptors.kvs(
+ TypeDescriptors.strings(), TypeDescriptors.integers()))
+ .via(kv -> KV.of(kv.getKey(), sum(kv.getValue()))))
+ .apply("FixedWindows", Window.into(FixedWindows.of(Duration.millis(1))));
+
+ PAssert.that(result)
+ .inWindow(new IntervalWindow(new Instant(9), new Instant(10)))
+ .containsInAnyOrder(KV.of("foo", 14))
+ .inWindow(new IntervalWindow(new Instant(3), new Instant(4)))
+ .containsInAnyOrder(KV.of("bar", 3));
+
+ p.run();
+ }
+
+ private static int sum(Iterable<Integer> parts) {
+ return Streams.stream(parts).mapToInt(e -> e).sum();
+ }
+
+ @Test
@Category(NeedsRunner.class)
public void testIdentityWindowFnPropagation() {