| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.transforms.windowing; |
| |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.instanceOf; |
| import static org.hamcrest.Matchers.is; |
| import static org.hamcrest.Matchers.isOneOf; |
| import static org.hamcrest.Matchers.not; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.when; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.io.Serializable; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import org.apache.beam.sdk.Pipeline.PipelineVisitor; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.Coder.NonDeterministicException; |
| import org.apache.beam.sdk.coders.CustomCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.io.GenerateSequence; |
| import org.apache.beam.sdk.runners.TransformHierarchy; |
| import org.apache.beam.sdk.testing.DataflowPortabilityApiUnsupported; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.testing.UsesCustomWindowMerging; |
| import org.apache.beam.sdk.testing.ValidatesRunner; |
| import org.apache.beam.sdk.transforms.Combine; |
| import org.apache.beam.sdk.transforms.Count; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.GroupByKey; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.SimpleFunction; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.hamcrest.Matchers; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.experimental.categories.Category; |
| import org.junit.rules.ExpectedException; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.Mockito; |
| |
| /** Tests for {@link Window}. */ |
| @RunWith(JUnit4.class) |
| public class WindowTest implements Serializable { |
| |
| @Rule |
| public final transient TestPipeline pipeline = |
| TestPipeline.create().enableAbandonedNodeEnforcement(false); |
| |
| @Rule public transient ExpectedException thrown = ExpectedException.none(); |
| |
| @Test |
| public void testWindowIntoSetWindowfn() { |
| WindowingStrategy<?, ?> strategy = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10)))) |
| .getWindowingStrategy(); |
| assertTrue(strategy.getWindowFn() instanceof FixedWindows); |
| assertTrue(strategy.getTrigger() instanceof DefaultTrigger); |
| assertEquals(AccumulationMode.DISCARDING_FIRED_PANES, strategy.getMode()); |
| } |
| |
| @Test |
| public void testWindowIntoTriggersAndAccumulating() { |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); |
| WindowingStrategy<?, ?> strategy = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply( |
| Window.<String>into(fixed10) |
| .triggering(trigger) |
| .accumulatingFiredPanes() |
| .withAllowedLateness(Duration.ZERO)) |
| .getWindowingStrategy(); |
| |
| assertEquals(fixed10, strategy.getWindowFn()); |
| assertEquals(trigger, strategy.getTrigger()); |
| assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode()); |
| } |
| |
| @Test |
| public void testWindowIntoAccumulatingLatenessNoTrigger() { |
| FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10)); |
| WindowingStrategy<?, ?> strategy = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply( |
| "Lateness", |
| Window.<String>into(fixed) |
| .withAllowedLateness(Duration.standardDays(1)) |
| .accumulatingFiredPanes()) |
| .getWindowingStrategy(); |
| |
| assertThat(strategy.isTriggerSpecified(), is(false)); |
| assertThat(strategy.isModeSpecified(), is(true)); |
| assertThat(strategy.isAllowedLatenessSpecified(), is(true)); |
| assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES)); |
| assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1))); |
| } |
| |
| @Test |
| public void testWindowPropagatesEachPart() { |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); |
| WindowingStrategy<?, ?> strategy = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply("Mode", Window.<String>configure().accumulatingFiredPanes()) |
| .apply( |
| "Lateness", |
| Window.<String>configure().withAllowedLateness(Duration.standardDays(1))) |
| .apply("Trigger", Window.<String>configure().triggering(trigger)) |
| .apply("Window", Window.into(fixed10)) |
| .getWindowingStrategy(); |
| |
| assertEquals(fixed10, strategy.getWindowFn()); |
| assertEquals(trigger, strategy.getTrigger()); |
| assertEquals(AccumulationMode.ACCUMULATING_FIRED_PANES, strategy.getMode()); |
| assertEquals(Duration.standardDays(1), strategy.getAllowedLateness()); |
| } |
| |
| @Test |
| public void testWindowIntoPropagatesLateness() { |
| |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25)); |
| WindowingStrategy<?, ?> strategy = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply( |
| "WindowInto10", |
| Window.<String>into(fixed10) |
| .withAllowedLateness(Duration.standardDays(1)) |
| .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))) |
| .accumulatingFiredPanes()) |
| .apply("WindowInto25", Window.into(fixed25)) |
| .getWindowingStrategy(); |
| |
| assertEquals(Duration.standardDays(1), strategy.getAllowedLateness()); |
| assertEquals(fixed25, strategy.getWindowFn()); |
| } |
| |
| @Test |
| public void testWindowIntoAssignesLongerAllowedLateness() { |
| |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25)); |
| |
| PCollection<String> notChanged = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply( |
| "WindowInto25", |
| Window.<String>into(fixed25) |
| .withAllowedLateness(Duration.standardDays(1)) |
| .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))) |
| .accumulatingFiredPanes()) |
| .apply( |
| "WindowInto10", |
| Window.<String>into(fixed10).withAllowedLateness(Duration.standardDays(2))); |
| |
| assertEquals(Duration.standardDays(2), notChanged.getWindowingStrategy().getAllowedLateness()); |
| |
| PCollection<String> data = |
| pipeline.apply( |
| "createChanged", Create.of("hello", "world").withCoder(StringUtf8Coder.of())); |
| |
| PCollection<String> longWindow = |
| data.apply( |
| "WindowInto25c", |
| Window.<String>into(fixed25) |
| .withAllowedLateness(Duration.standardDays(1)) |
| .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))) |
| .accumulatingFiredPanes()); |
| |
| assertEquals(Duration.standardDays(1), longWindow.getWindowingStrategy().getAllowedLateness()); |
| |
| PCollection<String> autoCorrectedWindow = |
| longWindow.apply( |
| "WindowInto10c", |
| Window.<String>into(fixed10).withAllowedLateness(Duration.standardHours(1))); |
| |
| assertEquals( |
| Duration.standardDays(1), autoCorrectedWindow.getWindowingStrategy().getAllowedLateness()); |
| } |
| |
| /** |
| * With {@link #testWindowIntoNullWindowFnNoAssign()}, demonstrates that the expansions of the |
| * {@link Window} transform depends on if it actually assigns elements to windows. |
| */ |
| @Test |
| public void testWindowIntoWindowFnAssign() { |
| pipeline |
| .apply(Create.of(1, 2, 3)) |
| .apply( |
| Window.into(FixedWindows.of(Duration.standardMinutes(11L).plus(Duration.millis(1L))))); |
| |
| final AtomicBoolean foundAssign = new AtomicBoolean(false); |
| pipeline.traverseTopologically( |
| new PipelineVisitor.Defaults() { |
| @Override |
| public void visitPrimitiveTransform(TransformHierarchy.Node node) { |
| if (node.getTransform() instanceof Window.Assign) { |
| foundAssign.set(true); |
| } |
| } |
| }); |
| assertThat(foundAssign.get(), is(true)); |
| } |
| |
| /** |
| * With {@link #testWindowIntoWindowFnAssign()}, demonstrates that the expansions of the {@link |
| * Window} transform depends on if it actually assigns elements to windows. |
| */ |
| @Test |
| public void testWindowIntoNullWindowFnNoAssign() { |
| pipeline |
| .apply(Create.of(1, 2, 3)) |
| .apply( |
| Window.<Integer>configure() |
| .triggering(AfterWatermark.pastEndOfWindow()) |
| .withAllowedLateness(Duration.ZERO) |
| .accumulatingFiredPanes()); |
| |
| pipeline.traverseTopologically( |
| new PipelineVisitor.Defaults() { |
| @Override |
| public void visitPrimitiveTransform(TransformHierarchy.Node node) { |
| assertThat(node.getTransform(), not(instanceOf(Window.Assign.class))); |
| } |
| }); |
| } |
| |
| @Test |
| public void testWindowGetName() { |
| assertEquals( |
| "Window.Into()", |
| Window.<String>into(FixedWindows.of(Duration.standardMinutes(10))).getName()); |
| } |
| |
| @Test |
| public void testNonDeterministicWindowCoder() throws NonDeterministicException { |
| FixedWindows mockWindowFn = Mockito.mock(FixedWindows.class); |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| Class<Coder<IntervalWindow>> coderClazz = (Class) Coder.class; |
| Coder<IntervalWindow> mockCoder = Mockito.mock(coderClazz); |
| when(mockWindowFn.windowCoder()).thenReturn(mockCoder); |
| NonDeterministicException toBeThrown = |
| new NonDeterministicException(mockCoder, "Its just not deterministic."); |
| Mockito.doThrow(toBeThrown).when(mockCoder).verifyDeterministic(); |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectCause(Matchers.sameInstance(toBeThrown)); |
| thrown.expectMessage("Window coders must be deterministic"); |
| Window.into(mockWindowFn); |
| } |
| |
| @Test |
| public void testMissingMode() { |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); |
| |
| PCollection<String> input = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply("Window", Window.into(fixed10)); |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("requires that the accumulation mode"); |
| input.apply( |
| "Triggering", |
| Window.<String>configure() |
| .withAllowedLateness(Duration.standardDays(1)) |
| .triggering(trigger)); |
| } |
| |
| @Test |
| public void testMissingModeViaLateness() { |
| FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10)); |
| PCollection<String> input = |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply("Window", Window.into(fixed)); |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("allowed lateness"); |
| thrown.expectMessage("accumulation mode be specified"); |
| input.apply( |
| "Lateness", Window.<String>configure().withAllowedLateness(Duration.standardDays(1))); |
| } |
| |
| @Test |
| public void testMissingLateness() { |
| FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); |
| Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); |
| |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage("requires that the allowed lateness"); |
| pipeline |
| .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) |
| .apply("Mode", Window.<String>configure().accumulatingFiredPanes()) |
| .apply("Window", Window.into(fixed10)) |
| .apply("Trigger", Window.<String>configure().triggering(trigger)); |
| } |
| |
| private static class WindowOddEvenBuckets extends NonMergingWindowFn<Long, IntervalWindow> { |
| private static final IntervalWindow EVEN_WINDOW = |
| new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp()); |
| private static final IntervalWindow ODD_WINDOW = |
| new IntervalWindow( |
| BoundedWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE.maxTimestamp().minus(1)); |
| |
| @Override |
| public Collection<IntervalWindow> assignWindows(AssignContext c) throws Exception { |
| if (c.element() % 2 == 0) { |
| return Collections.singleton(EVEN_WINDOW); |
| } |
| return Collections.singleton(ODD_WINDOW); |
| } |
| |
| @Override |
| public boolean isCompatible(WindowFn<?, ?> other) { |
| return other instanceof WindowOddEvenBuckets; |
| } |
| |
| @Override |
| public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException { |
| if (!this.isCompatible(other)) { |
| throw new IncompatibleWindowException( |
| other, "WindowOddEvenBuckets is only compatible with WindowOddEvenBuckets."); |
| } |
| } |
| |
| @Override |
| public Coder<IntervalWindow> windowCoder() { |
| return new IntervalWindow.IntervalWindowCoder(); |
| } |
| |
| @Override |
| public WindowMappingFn<IntervalWindow> getDefaultWindowMappingFn() { |
| throw new UnsupportedOperationException( |
| String.format("Can't use %s for side inputs", getClass().getSimpleName())); |
| } |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, DataflowPortabilityApiUnsupported.class}) |
| public void testNoWindowFnDoesNotReassignWindows() { |
| pipeline.enableAbandonedNodeEnforcement(true); |
| |
| final PCollection<Long> initialWindows = |
| pipeline |
| .apply(GenerateSequence.from(0).to(10)) |
| .apply("AssignWindows", Window.into(new WindowOddEvenBuckets())); |
| |
| // Sanity check the window assignment to demonstrate the baseline |
| PAssert.that(initialWindows) |
| .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) |
| .containsInAnyOrder(0L, 2L, 4L, 6L, 8L); |
| PAssert.that(initialWindows) |
| .inWindow(WindowOddEvenBuckets.ODD_WINDOW) |
| .containsInAnyOrder(1L, 3L, 5L, 7L, 9L); |
| |
| PCollection<Boolean> upOne = |
| initialWindows.apply( |
| "ModifyTypes", |
| MapElements.via( |
| new SimpleFunction<Long, Boolean>() { |
| @Override |
| public Boolean apply(Long input) { |
| return input % 2 == 0; |
| } |
| })); |
| PAssert.that(upOne) |
| .inWindow(WindowOddEvenBuckets.EVEN_WINDOW) |
| .containsInAnyOrder(true, true, true, true, true); |
| PAssert.that(upOne) |
| .inWindow(WindowOddEvenBuckets.ODD_WINDOW) |
| .containsInAnyOrder(false, false, false, false, false); |
| |
| // The elements should be in the same windows, even though they would not be assigned to the |
| // same windows with the updated timestamps. If we try to apply the original WindowFn, the type |
| // will not be appropriate and the runner should crash, as a Boolean cannot be converted into |
| // a long. |
| upOne.apply( |
| "UpdateWindowingStrategy", |
| Window.<Boolean>configure() |
| .triggering(Never.ever()) |
| .withAllowedLateness(Duration.ZERO) |
| .accumulatingFiredPanes()); |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests that when two elements are combined via a GroupByKey their output timestamp agrees with |
| * the windowing function default, the end of the window. |
| */ |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testTimestampCombinerDefault() { |
| pipeline.enableAbandonedNodeEnforcement(true); |
| |
| pipeline |
| .apply( |
| Create.timestamped( |
| TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), |
| TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) |
| .apply(Window.into(FixedWindows.of(Duration.standardMinutes(10)))) |
| .apply(GroupByKey.create()) |
| .apply( |
| ParDo.of( |
| new DoFn<KV<Integer, Iterable<String>>, Void>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) throws Exception { |
| assertThat( |
| c.timestamp(), |
| equalTo( |
| new IntervalWindow( |
| new Instant(0), |
| new Instant(0).plus(Duration.standardMinutes(10))) |
| .maxTimestamp())); |
| } |
| })); |
| |
| pipeline.run(); |
| } |
| |
| /** |
| * Tests that when two elements are combined via a GroupByKey their output timestamp agrees with |
| * the windowing function customized to use the end of the window. |
| */ |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testTimestampCombinerEndOfWindow() { |
| pipeline.enableAbandonedNodeEnforcement(true); |
| |
| pipeline |
| .apply( |
| Create.timestamped( |
| TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), |
| TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) |
| .apply( |
| Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10))) |
| .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW)) |
| .apply(GroupByKey.create()) |
| .apply( |
| ParDo.of( |
| new DoFn<KV<Integer, Iterable<String>>, Void>() { |
| @ProcessElement |
| public void processElement(ProcessContext c) throws Exception { |
| assertThat(c.timestamp(), equalTo(new Instant(10 * 60 * 1000 - 1))); |
| } |
| })); |
| |
| pipeline.run(); |
| } |
| |
| @Test |
| public void testDisplayData() { |
| FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); |
| AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); |
| Duration allowedLateness = Duration.standardMinutes(10); |
| Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; |
| TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; |
| |
| Window<?> window = |
| Window.into(windowFn) |
| .triggering(triggerBuilder) |
| .accumulatingFiredPanes() |
| .withAllowedLateness(allowedLateness, closingBehavior) |
| .withTimestampCombiner(timestampCombiner); |
| |
| DisplayData displayData = DisplayData.from(window); |
| |
| assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass())); |
| assertThat(displayData, includesDisplayDataFor("windowFn", windowFn)); |
| |
| assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString())); |
| assertThat( |
| displayData, |
| hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString())); |
| assertThat(displayData, hasDisplayItem("allowedLateness", allowedLateness)); |
| assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); |
| assertThat(displayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); |
| } |
| |
| @Test |
| @Category(ValidatesRunner.class) |
| public void testPrimitiveDisplayData() { |
| FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); |
| AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); |
| Duration allowedLateness = Duration.standardMinutes(10); |
| Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; |
| TimestampCombiner timestampCombiner = TimestampCombiner.END_OF_WINDOW; |
| |
| Window<?> window = |
| Window.into(windowFn) |
| .triggering(triggerBuilder) |
| .accumulatingFiredPanes() |
| .withAllowedLateness(allowedLateness, closingBehavior) |
| .withTimestampCombiner(timestampCombiner); |
| |
| DisplayData primitiveDisplayData = |
| Iterables.getOnlyElement( |
| DisplayDataEvaluator.create().displayDataForPrimitiveTransforms(window)); |
| |
| assertThat(primitiveDisplayData, hasDisplayItem("windowFn", windowFn.getClass())); |
| assertThat(primitiveDisplayData, includesDisplayDataFor("windowFn", windowFn)); |
| |
| assertThat(primitiveDisplayData, hasDisplayItem("trigger", triggerBuilder.toString())); |
| assertThat( |
| primitiveDisplayData, |
| hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString())); |
| assertThat(primitiveDisplayData, hasDisplayItem("allowedLateness", allowedLateness)); |
| assertThat(primitiveDisplayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); |
| assertThat( |
| primitiveDisplayData, hasDisplayItem("timestampCombiner", timestampCombiner.toString())); |
| } |
| |
| @Test |
| public void testAssignDisplayDataUnchanged() { |
| FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); |
| |
| Window<Object> original = Window.into(windowFn); |
| WindowingStrategy<?, ?> updated = WindowingStrategy.globalDefault().withWindowFn(windowFn); |
| |
| DisplayData displayData = DisplayData.from(new Window.Assign<>(original, updated)); |
| |
| assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass())); |
| assertThat(displayData, includesDisplayDataFor("windowFn", windowFn)); |
| |
| assertThat(displayData, not(hasDisplayItem("trigger"))); |
| assertThat(displayData, not(hasDisplayItem("accumulationMode"))); |
| assertThat(displayData, not(hasDisplayItem("allowedLateness"))); |
| assertThat(displayData, not(hasDisplayItem("closingBehavior"))); |
| assertThat(displayData, not(hasDisplayItem("timestampCombiner"))); |
| } |
| |
| @Test |
| public void testDisplayDataExcludesUnspecifiedProperties() { |
| Window<?> onlyHasAccumulationMode = Window.configure().discardingFiredPanes(); |
| assertThat( |
| DisplayData.from(onlyHasAccumulationMode), |
| not( |
| hasDisplayItem( |
| hasKey( |
| isOneOf( |
| "windowFn", |
| "trigger", |
| "timestampCombiner", |
| "allowedLateness", |
| "closingBehavior"))))); |
| |
| Window<?> noAccumulationMode = Window.into(new GlobalWindows()); |
| assertThat( |
| DisplayData.from(noAccumulationMode), not(hasDisplayItem(hasKey("accumulationMode")))); |
| } |
| |
| @Test |
| public void testDisplayDataExcludesDefaults() { |
| Window<?> window = |
| Window.into(new GlobalWindows()) |
| .triggering(DefaultTrigger.of()) |
| .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); |
| |
| DisplayData data = DisplayData.from(window); |
| assertThat(data, not(hasDisplayItem("trigger"))); |
| assertThat(data, not(hasDisplayItem("allowedLateness"))); |
| } |
| |
| @Test |
| @Category({ValidatesRunner.class, UsesCustomWindowMerging.class}) |
| public void testMergingCustomWindows() { |
| Instant startInstant = new Instant(0L); |
| PCollection<String> inputCollection = |
| pipeline.apply( |
| Create.timestamped( |
| TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10))), |
| TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20))), |
| // This one will be outside of bigWindow thus not merged |
| TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39))))); |
| PCollection<String> windowedCollection = |
| inputCollection.apply(Window.into(new CustomWindowFn<>())); |
| PCollection<Long> count = |
| windowedCollection.apply(Combine.globally(Count.<String>combineFn()).withoutDefaults()); |
| // "small1" and "big" elements merged into bigWindow "small2" not merged |
| // because timestamp is not in bigWindow |
| PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L); |
| pipeline.run(); |
| } |
| |
| // This test is usefull because some runners have a special merge implementation |
| // for keyed collections |
| @Test |
| @Category({ValidatesRunner.class, UsesCustomWindowMerging.class}) |
| public void testMergingCustomWindowsKeyedCollection() { |
| Instant startInstant = new Instant(0L); |
| PCollection<KV<Integer, String>> inputCollection = |
| pipeline.apply( |
| Create.timestamped( |
| TimestampedValue.of( |
| KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10))), |
| TimestampedValue.of( |
| KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20))), |
| // This element is not contained within the bigWindow and not merged |
| TimestampedValue.of( |
| KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39))))); |
| PCollection<KV<Integer, String>> windowedCollection = |
| inputCollection.apply(Window.into(new CustomWindowFn<>())); |
| PCollection<Long> count = |
| windowedCollection.apply( |
| Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults()); |
| // "small1" and "big" elements merged into bigWindow "small2" not merged |
| // because it is not contained in bigWindow |
| PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L); |
| pipeline.run(); |
| } |
| |
| private static class CustomWindow extends IntervalWindow { |
| private boolean isBig; |
| |
| CustomWindow(Instant start, Instant end, boolean isBig) { |
| super(start, end); |
| this.isBig = isBig; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (o == null || getClass() != o.getClass()) { |
| return false; |
| } |
| CustomWindow that = (CustomWindow) o; |
| return super.equals(o) && this.isBig == that.isBig; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(super.hashCode(), isBig); |
| } |
| } |
| |
| private static class CustomWindowCoder extends CustomCoder<CustomWindow> { |
| |
| private static final CustomWindowCoder INSTANCE = new CustomWindowCoder(); |
| private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder(); |
| private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of(); |
| |
| public static CustomWindowCoder of() { |
| return INSTANCE; |
| } |
| |
| @Override |
| public void encode(CustomWindow window, OutputStream outStream) throws IOException { |
| INTERVAL_WINDOW_CODER.encode(window, outStream); |
| VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream); |
| } |
| |
| @Override |
| public CustomWindow decode(InputStream inStream) throws IOException { |
| IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream); |
| boolean isBig = VAR_INT_CODER.decode(inStream) != 0; |
| return new CustomWindow(superWindow.start(), superWindow.end(), isBig); |
| } |
| |
| @Override |
| public void verifyDeterministic() throws NonDeterministicException { |
| INTERVAL_WINDOW_CODER.verifyDeterministic(); |
| VAR_INT_CODER.verifyDeterministic(); |
| } |
| } |
| |
| private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> { |
| @Override |
| public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception { |
| String element; |
| // It loses genericity of type T but this is not a big deal for a test. |
| // And it allows to avoid duplicating CustomWindowFn to support PCollection<KV> |
| if (c.element() instanceof KV) { |
| element = ((KV<Integer, String>) c.element()).getValue(); |
| } else { |
| element = (String) c.element(); |
| } |
| // put big elements in windows of 30s and small ones in windows of 5s |
| if ("big".equals(element)) { |
| return Collections.singletonList( |
| new CustomWindow( |
| c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), true)); |
| } else { |
| return Collections.singletonList( |
| new CustomWindow( |
| c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), false)); |
| } |
| } |
| |
| @Override |
| public void mergeWindows(MergeContext c) throws Exception { |
| Map<CustomWindow, Set<CustomWindow>> windowsToMerge = new HashMap<>(); |
| for (CustomWindow window : c.windows()) { |
| if (window.isBig) { |
| HashSet<CustomWindow> windows = new HashSet<>(); |
| windows.add(window); |
| windowsToMerge.put(window, windows); |
| } |
| } |
| for (CustomWindow window : c.windows()) { |
| for (Map.Entry<CustomWindow, Set<CustomWindow>> bigWindow : windowsToMerge.entrySet()) { |
| if (bigWindow.getKey().contains(window)) { |
| bigWindow.getValue().add(window); |
| } |
| } |
| } |
| for (Map.Entry<CustomWindow, Set<CustomWindow>> mergeEntry : windowsToMerge.entrySet()) { |
| c.merge(mergeEntry.getValue(), mergeEntry.getKey()); |
| } |
| } |
| |
| @Override |
| public boolean isCompatible(WindowFn<?, ?> other) { |
| return other instanceof CustomWindowFn; |
| } |
| |
| @Override |
| public Coder<CustomWindow> windowCoder() { |
| return CustomWindowCoder.of(); |
| } |
| |
| @Override |
| public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() { |
| throw new UnsupportedOperationException("side inputs not supported"); |
| } |
| } |
| } |