| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.core; |
| |
| import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; |
| import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; |
| import static org.hamcrest.Matchers.anyOf; |
| import static org.hamcrest.Matchers.contains; |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.hamcrest.Matchers.emptyIterable; |
| import static org.hamcrest.Matchers.equalTo; |
| import static org.hamcrest.Matchers.greaterThanOrEqualTo; |
| import static org.hamcrest.Matchers.lessThan; |
| import static org.hamcrest.Matchers.nullValue; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertThat; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Matchers.any; |
| import static org.mockito.Mockito.doAnswer; |
| import static org.mockito.Mockito.doNothing; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.when; |
| import static org.mockito.Mockito.withSettings; |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.concurrent.ThreadLocalRandom; |
| import org.apache.beam.runners.core.metrics.MetricsContainerImpl; |
| import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine; |
| import org.apache.beam.runners.core.triggers.TriggerStateMachine; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.metrics.MetricName; |
| import org.apache.beam.sdk.metrics.MetricsEnvironment; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.state.TimeDomain; |
| import org.apache.beam.sdk.transforms.CombineWithContext.CombineFnWithContext; |
| import org.apache.beam.sdk.transforms.CombineWithContext.Context; |
| import org.apache.beam.sdk.transforms.Sum; |
| import org.apache.beam.sdk.transforms.windowing.AfterEach; |
| import org.apache.beam.sdk.transforms.windowing.AfterFirst; |
| import org.apache.beam.sdk.transforms.windowing.AfterPane; |
| import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; |
| import org.apache.beam.sdk.transforms.windowing.AfterWatermark; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; |
| import org.apache.beam.sdk.transforms.windowing.FixedWindows; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindows; |
| import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
| import org.apache.beam.sdk.transforms.windowing.Never; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Sessions; |
| import org.apache.beam.sdk.transforms.windowing.SlidingWindows; |
| import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; |
| import org.apache.beam.sdk.transforms.windowing.Trigger; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; |
| import org.apache.beam.sdk.transforms.windowing.WindowFn; |
| import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.apache.beam.sdk.values.WindowingStrategy; |
| import org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.junit.Before; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.mockito.Mock; |
| import org.mockito.Mockito; |
| import org.mockito.MockitoAnnotations; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Tests for {@link ReduceFnRunner}. These tests instantiate a full "stack" of {@link |
| * ReduceFnRunner} with enclosed {@link ReduceFn}, down to the installed {@link Trigger} (sometimes |
| * mocked). They proceed by injecting elements and advancing watermark and processing time, then |
| * verifying produced panes and counters. |
| */ |
| @RunWith(JUnit4.class) |
| public class ReduceFnRunnerTest { |
| private static final Logger LOG = LoggerFactory.getLogger(ReduceFnRunnerTest.class); |
| |
| @Mock private SideInputReader mockSideInputReader; |
| private TriggerStateMachine mockTriggerStateMachine; |
| private PCollectionView<Integer> mockView; |
| |
| private IntervalWindow firstWindow; |
| |
| private static TriggerStateMachine.TriggerContext anyTriggerContext() { |
| return Mockito.any(); |
| } |
| |
| private static TriggerStateMachine.OnElementContext anyElementContext() { |
| return Mockito.any(); |
| } |
| |
| @Before |
| public void setUp() { |
| MockitoAnnotations.initMocks(this); |
| |
| mockTriggerStateMachine = mock(TriggerStateMachine.class, withSettings().serializable()); |
| |
| @SuppressWarnings("unchecked") |
| PCollectionView<Integer> mockViewUnchecked = |
| mock(PCollectionView.class, withSettings().serializable()); |
| mockView = mockViewUnchecked; |
| firstWindow = new IntervalWindow(new Instant(0), new Instant(10)); |
| } |
| |
| private void injectElement(ReduceFnTester<Integer, ?, IntervalWindow> tester, int element) |
| throws Exception { |
| doNothing().when(mockTriggerStateMachine).onElement(anyElementContext()); |
| tester.injectElements(TimestampedValue.of(element, new Instant(element))); |
| } |
| |
| private void injectElements( |
| ReduceFnTester<Integer, ?, IntervalWindow> tester, Iterable<Integer> values) |
| throws Exception { |
| doNothing().when(mockTriggerStateMachine).onElement(anyElementContext()); |
| List<TimestampedValue<Integer>> timestampedValues = new ArrayList<>(); |
| for (int value : values) { |
| timestampedValues.add(TimestampedValue.of(value, new Instant(value))); |
| } |
| tester.injectElements(timestampedValues); |
| } |
| |
| private void injectElements(ReduceFnTester<Integer, ?, IntervalWindow> tester, Integer... values) |
| throws Exception { |
| injectElements(tester, Arrays.asList(values)); |
| } |
| |
| private void triggerShouldFinish(TriggerStateMachine mockTrigger) throws Exception { |
| doAnswer( |
| invocation -> { |
| @SuppressWarnings("unchecked") |
| TriggerStateMachine.TriggerContext context = |
| (TriggerStateMachine.TriggerContext) invocation.getArguments()[0]; |
| context.trigger().setFinished(true); |
| return null; |
| }) |
| .when(mockTrigger) |
| .onFire(anyTriggerContext()); |
| } |
| |
| /** Tests that a processing time timer does not cause window GC. */ |
| @Test |
| public void testProcessingTimeTimerDoesNotGc() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.ZERO) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceProcessingTime(new Instant(5000)); |
| injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 |
| injectElement(tester, 5); |
| |
| tester.advanceProcessingTime(new Instant(10000)); |
| |
| tester.assertHasOnlyGlobalAndStateFor(new IntervalWindow(new Instant(0), new Instant(100))); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| equalTo(7), 2, 0, 100, PaneInfo.createPane(true, false, Timing.EARLY, 0, 0)))); |
| } |
| |
| @Test |
| public void testOnElementBufferingDiscarding() throws Exception { |
| // Test basic execution of a trigger using a non-combining window set and discarding mode. |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Pane of {1, 2} |
| injectElement(tester, 1); |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 2); |
| assertThat( |
| tester.extractOutput(), |
| contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10))); |
| |
| // Pane of just 3, and finish |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 3); |
| assertThat( |
| tester.extractOutput(), contains(isSingleWindowedValue(containsInAnyOrder(3), 3, 0, 10))); |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| |
| // This element shouldn't be seen, because the trigger has finished |
| injectElement(tester, 4); |
| |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(1, droppedElements); |
| } |
| |
| @Test |
| public void testOnElementBufferingAccumulating() throws Exception { |
| // Test basic execution of a trigger using a non-combining window set and accumulating mode. |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| injectElement(tester, 1); |
| |
| // Fires {1, 2} |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 2); |
| |
| // Fires {1, 2, 3} because we are in accumulating mode |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 3); |
| |
| // This element shouldn't be seen, because the trigger has finished |
| injectElement(tester, 4); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), |
| isSingleWindowedValue(containsInAnyOrder(1, 2, 3), 3, 0, 10))); |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| } |
| |
| /** |
| * When the watermark passes the end-of-window and window expiration time in a single update, this |
| * tests that it does not crash. |
| */ |
| @Test |
| public void testSessionEowAndGcTogether() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| DefaultTriggerStateMachine.of(), |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_ALWAYS); |
| |
| tester.setAutoAdvanceOutputWatermark(true); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| injectElement(tester, 1); |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| contains(1), 1, 1, 11, PaneInfo.createPane(true, true, Timing.ON_TIME)))); |
| } |
| |
| /** |
| * When the watermark passes the end-of-window and window expiration time in a single update, this |
| * tests that it does not crash. |
| */ |
| @Test |
| public void testFixedWindowsEowAndGcTogether() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| DefaultTriggerStateMachine.of(), |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_ALWAYS); |
| |
| tester.setAutoAdvanceOutputWatermark(true); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| injectElement(tester, 1); |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME)))); |
| } |
| |
| /** |
| * When the watermark passes the end-of-window and window expiration time in a single update, this |
| * tests that it does not crash. |
| */ |
| @Test |
| public void testFixedWindowsEowAndGcTogetherFireIfNonEmpty() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| DefaultTriggerStateMachine.of(), |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| tester.setAutoAdvanceOutputWatermark(true); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| injectElement(tester, 1); |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| isSingleWindowedValue( |
| contains(1), 1, 0, 10, PaneInfo.createPane(true, true, Timing.ON_TIME)))); |
| } |
| |
| /** |
| * Tests that with the default trigger we will not produce two ON_TIME panes, even if there are |
| * two outputs that are both candidates. |
| */ |
| @Test |
| public void testOnlyOneOnTimePane() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTrigger(DefaultTrigger.of()) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| |
| int value1 = 1; |
| int value2 = 3; |
| |
| // A single element that should be in the ON_TIME output |
| tester.injectElements(TimestampedValue.of(value1, new Instant(1))); |
| |
| // Should fire ON_TIME |
| tester.advanceInputWatermark(new Instant(10)); |
| |
| // The DefaultTrigger should cause output labeled LATE, even though it does not have to be |
| // labeled as such. |
| tester.injectElements(TimestampedValue.of(value2, new Instant(3))); |
| |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| |
| assertThat(output.get(0), isWindowedValue(equalTo(value1))); |
| assertThat(output.get(1), isWindowedValue(equalTo(value1 + value2))); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 1))); |
| } |
| |
| @Test |
| public void testOnElementCombiningDiscarding() throws Exception { |
| // Test basic execution of a trigger using a non-combining window set and discarding mode. |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining( |
| strategy, mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| injectElement(tester, 2); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 3); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 4); |
| |
| // This element shouldn't be seen, because the trigger has finished |
| injectElement(tester, 6); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue(equalTo(5), 2, 0, 10), |
| isSingleWindowedValue(equalTo(4), 4, 0, 10))); |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| } |
| |
| /** |
| * Tests that when a processing time timer comes in after a window is expired it is just ignored. |
| */ |
| @Test |
| public void testLateProcessingTimeTimer() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.ZERO) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceProcessingTime(new Instant(5000)); |
| injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 |
| injectElement(tester, 5); |
| |
| // After this advancement, the window is expired and only the GC process |
| // should be allowed to touch it |
| tester.advanceInputWatermarkNoTimers(new Instant(100)); |
| |
| // This should not output |
| tester.advanceProcessingTime(new Instant(6000)); |
| |
| assertThat(tester.extractOutput(), emptyIterable()); |
| } |
| |
| /** |
| * Tests that when a processing time timer comes in after a window is expired but in the same |
| * bundle it does not cause a spurious output. |
| */ |
| @Test |
| public void testCombiningAccumulatingProcessingTime() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.ZERO) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceProcessingTime(new Instant(5000)); |
| injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 |
| injectElement(tester, 5); |
| |
| tester.advanceInputWatermarkNoTimers(new Instant(100)); |
| tester.advanceProcessingTimeNoTimers(new Instant(5010)); |
| |
| // Fires the GC/EOW timer at the same time as the processing time timer. |
| tester.fireTimers( |
| new IntervalWindow(new Instant(0), new Instant(100)), |
| TimestampedValue.of(TimeDomain.EVENT_TIME, new Instant(100)), |
| TimestampedValue.of(TimeDomain.PROCESSING_TIME, new Instant(5010))); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); |
| } |
| |
| /** |
| * Tests that the garbage collection time for a fixed window does not overflow the end of time. |
| */ |
| @Test |
| public void testFixedWindowEndOfTimeGarbageCollection() throws Exception { |
| Duration allowedLateness = Duration.standardDays(365); |
| Duration windowSize = Duration.millis(10); |
| WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize); |
| |
| // This timestamp falls into a window where the end of the window is before the end of the |
| // global window - the "end of time" - yet its expiration time is after. |
| final Instant elementTimestamp = |
| GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1); |
| |
| IntervalWindow window = |
| Iterables.getOnlyElement( |
| windowFn.assignWindows( |
| windowFn.new AssignContext() { |
| @Override |
| public Object element() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Instant timestamp() { |
| return elementTimestamp; |
| } |
| |
| @Override |
| public BoundedWindow window() { |
| throw new UnsupportedOperationException(); |
| } |
| })); |
| |
| assertTrue(window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp())); |
| assertTrue( |
| window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp())); |
| |
| // Test basic execution of a trigger using a non-combining window set and accumulating mode. |
| |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) windowFn) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger(AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever())) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withAllowedLateness(allowedLateness); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.injectElements(TimestampedValue.of(13, elementTimestamp)); |
| |
| // Should fire ON_TIME pane and there will be a checkState that the cleanup time |
| // is prior to timestamp max value |
| tester.advanceInputWatermark(window.maxTimestamp()); |
| |
| // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner) |
| assertThat(tester.extractOutput(), emptyIterable()); |
| |
| tester.injectElements(TimestampedValue.of(42, elementTimestamp)); |
| |
| // Now the final pane should fire, demonstrating that the GC time was truncated |
| tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp()); |
| assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55)))); |
| } |
| |
| /** |
| * Tests that when a processing time timers comes in after a window is expired and GC'd it does |
| * not cause a spurious output. |
| */ |
| @Test |
| public void testCombiningAccumulatingProcessingTimeSeparateBundles() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.ZERO) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.millis(10)))); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceProcessingTime(new Instant(5000)); |
| injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 |
| injectElement(tester, 5); |
| |
| tester.advanceInputWatermark(new Instant(100)); |
| tester.advanceProcessingTime(new Instant(5011)); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); |
| } |
| |
| /** |
| * Tests that if end-of-window and GC timers come in together, that the pane is correctly marked |
| * as final. |
| */ |
| @Test |
| public void testCombiningAccumulatingEventTime() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(100))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(1)) |
| .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| injectElement(tester, 2); // processing timer @ 5000 + 10; EOW timer @ 100 |
| injectElement(tester, 5); |
| |
| tester.advanceInputWatermark(new Instant(1000)); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue( |
| equalTo(7), 2, 0, 100, PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0)))); |
| } |
| |
| @Test |
| public void testOnElementCombiningAccumulating() throws Exception { |
| // Test basic execution of a trigger using a non-combining window set and accumulating mode. |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining( |
| strategy, mockTriggerStateMachine, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| injectElement(tester, 1); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 2); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 3); |
| |
| // This element shouldn't be seen, because the trigger has finished |
| injectElement(tester, 4); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue(equalTo(3), 1, 0, 10), |
| isSingleWindowedValue(equalTo(6), 3, 0, 10))); |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| } |
| |
| @Test |
| public void testOnElementCombiningWithContext() throws Exception { |
| // Create values at timestamps 0 .. 8, windowed into fixed windows of 2. |
| // Side input windowed into fixed windows of 4: |
| // main: [ 0 1 ] [ 2 3 ] [ 4 5 ] [ 6 7 ] |
| // side: [ 100 ] [ 104 ] |
| // Combine using a CombineFn "side input + sum(main inputs)". |
| final int firstWindowSideInput = 100; |
| final int secondWindowSideInput = 104; |
| final Integer expectedValue = firstWindowSideInput; |
| WindowingStrategy<?, IntervalWindow> mainInputWindowingStrategy = |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(2))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); |
| |
| WindowMappingFn<?> sideInputWindowMappingFn = |
| FixedWindows.of(Duration.millis(4)).getDefaultWindowMappingFn(); |
| when(mockView.getWindowMappingFn()).thenReturn((WindowMappingFn) sideInputWindowMappingFn); |
| |
| TestOptions options = PipelineOptionsFactory.as(TestOptions.class); |
| options.setValue(expectedValue); |
| |
| when(mockSideInputReader.contains(any(PCollectionView.class))).thenReturn(true); |
| when(mockSideInputReader.get(any(PCollectionView.class), any(BoundedWindow.class))) |
| .then( |
| invocation -> { |
| IntervalWindow sideInputWindow = (IntervalWindow) invocation.getArguments()[1]; |
| long startMs = sideInputWindow.start().getMillis(); |
| long endMs = sideInputWindow.end().getMillis(); |
| // Window should have been produced by sideInputWindowingStrategy. |
| assertThat(startMs, anyOf(equalTo(0L), equalTo(4L))); |
| assertThat(endMs - startMs, equalTo(4L)); |
| // If startMs == 4 (second window), equal to secondWindowSideInput. |
| return firstWindowSideInput + (int) startMs; |
| }); |
| |
| SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining( |
| mainInputWindowingStrategy, |
| mockTriggerStateMachine, |
| combineFn, |
| VarIntCoder.of(), |
| options, |
| mockSideInputReader); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| for (int i = 0; i < 8; ++i) { |
| injectElement(tester, i); |
| } |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| isSingleWindowedValue(equalTo(0 + firstWindowSideInput), 1, 0, 2), |
| isSingleWindowedValue(equalTo(0 + 1 + firstWindowSideInput), 1, 0, 2), |
| isSingleWindowedValue(equalTo(2 + firstWindowSideInput), 3, 2, 4), |
| isSingleWindowedValue(equalTo(2 + 3 + firstWindowSideInput), 3, 2, 4), |
| isSingleWindowedValue(equalTo(4 + secondWindowSideInput), 5, 4, 6), |
| isSingleWindowedValue(equalTo(4 + 5 + secondWindowSideInput), 5, 4, 6), |
| isSingleWindowedValue(equalTo(6 + secondWindowSideInput), 7, 6, 8), |
| isSingleWindowedValue(equalTo(6 + 7 + secondWindowSideInput), 7, 6, 8))); |
| } |
| |
| @Test |
| public void testWatermarkHoldAndLateData() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| // Test handling of late data. Specifically, ensure the watermark hold is correct. |
| Duration allowedLateness = Duration.millis(10); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| allowedLateness, |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Input watermark -> null |
| assertEquals(null, tester.getWatermarkHold()); |
| assertEquals(null, tester.getOutputWatermark()); |
| |
| // All on time data, verify watermark hold. |
| IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10)); |
| injectElement(tester, 1); |
| injectElement(tester, 3); |
| assertEquals(new Instant(1), tester.getWatermarkHold()); |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 2); |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| isSingleWindowedValue( |
| containsInAnyOrder(1, 2, 3), |
| equalTo(new Instant(1)), |
| equalTo((BoundedWindow) expectedWindow)))); |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| |
| // There is no end-of-window hold, but the timer set by the trigger holds the watermark |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| |
| // Nothing dropped. |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(0, droppedElements); |
| |
| // Input watermark -> 4, output watermark should advance that far as well |
| tester.advanceInputWatermark(new Instant(4)); |
| assertEquals(new Instant(4), tester.getOutputWatermark()); |
| |
| // Some late, some on time. Verify that we only hold to the minimum of on-time. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| tester.advanceInputWatermark(new Instant(4)); |
| injectElement(tester, 2); |
| injectElement(tester, 3); |
| |
| // Late data has arrived behind the _output_ watermark. The ReduceFnRunner sets a GC hold |
| // since this data is not permitted to hold up the output watermark. |
| assertThat( |
| tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); |
| |
| // Now data just ahead of the output watermark arrives and sets an earlier "element" hold |
| injectElement(tester, 5); |
| assertEquals(new Instant(5), tester.getWatermarkHold()); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 4); |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| isSingleWindowedValue( |
| containsInAnyOrder( |
| 1, 2, 3, // earlier firing |
| 2, 3, 4, 5), // new elements |
| 4, // timestamp |
| 0, // window start |
| 10))); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); |
| |
| // Since the element hold is cleared, there is no hold remaining |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| |
| // All behind the output watermark -- hold is at GC time (if we imagine the |
| // trigger sets a timer for ON_TIME firing, that is actually when they'll be emitted) |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| tester.advanceInputWatermark(new Instant(8)); |
| injectElement(tester, 6); |
| injectElement(tester, 5); |
| assertThat( |
| tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); |
| |
| injectElement(tester, 4); |
| |
| // Fire the ON_TIME pane |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| |
| // To get an ON_TIME pane, we need the output watermark to be held back a little; this would |
| // be done by way of the timers set by the trigger, which are mocked here |
| tester.setAutoAdvanceOutputWatermark(false); |
| |
| tester.advanceInputWatermark(expectedWindow.maxTimestamp().plus(1)); |
| tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); |
| |
| // Output time is end of the window, because all the new data was late, but the pane |
| // is the ON_TIME pane. |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| isSingleWindowedValue( |
| containsInAnyOrder( |
| 1, 2, 3, // earlier firing |
| 2, 3, 4, 5, // earlier firing |
| 4, 5, 6), // new elements |
| 9, // timestamp |
| 0, // window start |
| 10))); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0))); |
| |
| tester.setAutoAdvanceOutputWatermark(true); |
| |
| // This is "pending" at the time the watermark makes it way-late. |
| // Because we're about to expire the window, we output it. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| injectElement(tester, 8); |
| droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(0, droppedElements); |
| |
| // Exceed the GC limit, triggering the last pane to be fired |
| tester.advanceInputWatermark(new Instant(50)); |
| output = tester.extractOutput(); |
| // Output time is still end of the window, because the new data (8) was behind |
| // the output watermark. |
| assertThat( |
| output, |
| contains( |
| isSingleWindowedValue( |
| containsInAnyOrder( |
| 1, 2, 3, // earlier firing |
| 2, 3, 4, 5, // earlier firing |
| 4, 5, 6, // earlier firing |
| 8), // new element prior to window becoming expired |
| 9, // timestamp |
| 0, // window start |
| 10))); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 3, 1))); |
| assertEquals(new Instant(50), tester.getOutputWatermark()); |
| assertEquals(null, tester.getWatermarkHold()); |
| |
| // Late timers are ignored |
| tester.fireTimer( |
| new IntervalWindow(new Instant(0), new Instant(10)), |
| new Instant(12), |
| TimeDomain.EVENT_TIME); |
| |
| // And because we're past the end of window + allowed lateness, everything should be cleaned up. |
| assertFalse(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(); |
| } |
| |
| @Test |
| public void testWatermarkHoldForLateNewWindow() throws Exception { |
| Duration allowedLateness = Duration.standardMinutes(1); |
| Duration gapDuration = Duration.millis(10); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(Sessions.withGapDuration(gapDuration)) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterWatermark.pastEndOfWindow() |
| .withLateFirings(AfterPane.elementCountAtLeast(1)))) |
| .withAllowedLateness(allowedLateness)); |
| tester.setAutoAdvanceOutputWatermark(false); |
| |
| assertEquals(null, tester.getWatermarkHold()); |
| assertEquals(null, tester.getOutputWatermark()); |
| tester.advanceInputWatermark(new Instant(40)); |
| injectElements(tester, 1); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| injectElements(tester, 10); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| } |
| |
| @Test |
| public void testMergingWatermarkHoldLateNewWindowMerged() throws Exception { |
| Duration allowedLateness = Duration.standardMinutes(1); |
| Duration gapDuration = Duration.millis(10); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(Sessions.withGapDuration(gapDuration)) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterWatermark.pastEndOfWindow() |
| .withLateFirings(AfterPane.elementCountAtLeast(1)))) |
| .withAllowedLateness(allowedLateness)); |
| tester.setAutoAdvanceOutputWatermark(false); |
| |
| assertEquals(null, tester.getWatermarkHold()); |
| assertEquals(null, tester.getOutputWatermark()); |
| tester.advanceInputWatermark(new Instant(24)); |
| injectElements(tester, 1); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| injectElements(tester, 14); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| injectElements(tester, 6, 16); |
| // There should now be a watermark hold since the window has extended past the input watermark. |
| // The hold should be for the end of the window (last element + gapDuration - 1). |
| assertEquals(tester.getWatermarkHold(), new Instant(25)); |
| injectElements(tester, 6, 21); |
| // The hold should be extended with the window. |
| assertEquals(tester.getWatermarkHold(), new Instant(30)); |
| // Advancing the watermark should remove the hold. |
| tester.advanceInputWatermark(new Instant(31)); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| // Late elements added to the window should not generate a hold. |
| injectElements(tester, 0); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| // Generate a new window that is ontime. |
| injectElements(tester, 32, 40); |
| assertEquals(tester.getWatermarkHold(), new Instant(49)); |
| // Join the closed window with the new window. |
| injectElements(tester, 24); |
| assertEquals(tester.getWatermarkHold(), new Instant(49)); |
| tester.advanceInputWatermark(new Instant(50)); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| } |
| |
| @Test |
| public void testMergingLateWatermarkHolds() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| Duration gapDuration = Duration.millis(10); |
| Duration allowedLateness = Duration.standardMinutes(100); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(Sessions.withGapDuration(gapDuration)) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterWatermark.pastEndOfWindow() |
| .withLateFirings(AfterPane.elementCountAtLeast(10)))) |
| .withAllowedLateness(allowedLateness)); |
| tester.setAutoAdvanceOutputWatermark(false); |
| |
| // Input watermark -> null |
| assertEquals(null, tester.getWatermarkHold()); |
| assertEquals(null, tester.getOutputWatermark()); |
| |
| tester.advanceInputWatermark(new Instant(20)); |
| // Add two late elements that cause a window to merge. |
| injectElements(tester, Arrays.asList(3)); |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| injectElements(tester, Arrays.asList(4)); |
| Instant endOfWindow = new Instant(4).plus(gapDuration); |
| // We expect a GC hold to be one less than the end of window plus the allowed lateness. |
| Instant expectedGcHold = endOfWindow.plus(allowedLateness).minus(1); |
| assertEquals(expectedGcHold, tester.getWatermarkHold()); |
| tester.advanceInputWatermark(new Instant(1000)); |
| assertEquals(expectedGcHold, tester.getWatermarkHold()); |
| } |
| |
| @Test |
| public void testMergingWatermarkHoldAndLateDataFuzz() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| // Test handling of late data. Specifically, ensure the watermark hold is correct. |
| Duration allowedLateness = Duration.standardMinutes(100); |
| long seed = ThreadLocalRandom.current().nextLong(); |
| LOG.info("Random seed: {}", seed); |
| Random r = new Random(seed); |
| |
| Duration gapDuration = Duration.millis(10 + r.nextInt(40)); |
| LOG.info("Gap duration {}", gapDuration); |
| |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(Sessions.withGapDuration(gapDuration)) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterWatermark.pastEndOfWindow() |
| .withLateFirings(AfterPane.elementCountAtLeast(1)))) |
| .withAllowedLateness(allowedLateness)); |
| tester.setAutoAdvanceOutputWatermark(true); |
| |
| // Input watermark -> null |
| assertEquals(null, tester.getWatermarkHold()); |
| assertEquals(null, tester.getOutputWatermark()); |
| |
| // All on time data, verify watermark hold. |
| List<Integer> times = new ArrayList<>(); |
| |
| int numTs = 3 + r.nextInt(100); |
| int maxTs = 1 + r.nextInt(400); |
| LOG.info("Num ts {}", numTs); |
| LOG.info("Max ts {}", maxTs); |
| for (int i = numTs; i >= 0; --i) { |
| times.add(r.nextInt(maxTs)); |
| } |
| LOG.info("Times: {}", times); |
| |
| int split = 0; |
| long watermark = 0; |
| while (split < times.size()) { |
| int nextSplit = split + r.nextInt(times.size()); |
| if (nextSplit > times.size()) { |
| nextSplit = times.size(); |
| } |
| LOG.info("nextSplit {}", nextSplit); |
| injectElements(tester, times.subList(split, nextSplit)); |
| if (r.nextInt(3) == 0) { |
| int nextWatermark = r.nextInt((int) (maxTs + gapDuration.getMillis())); |
| if (nextWatermark > watermark) { |
| Boolean enabled = r.nextBoolean(); |
| LOG.info("nextWatermark {} {}", nextWatermark, enabled); |
| watermark = nextWatermark; |
| tester.setAutoAdvanceOutputWatermark(enabled); |
| tester.advanceInputWatermark(new Instant(watermark)); |
| } |
| } |
| split = nextSplit; |
| Instant hold = tester.getWatermarkHold(); |
| if (hold != null) { |
| assertThat(hold, greaterThanOrEqualTo(new Instant(watermark))); |
| assertThat(watermark, lessThan(maxTs + gapDuration.getMillis())); |
| } |
| } |
| tester.setAutoAdvanceOutputWatermark(true); |
| watermark = gapDuration.getMillis() + maxTs; |
| tester.advanceInputWatermark(new Instant(watermark)); |
| LOG.info("Output {}", tester.extractOutput()); |
| if (tester.getWatermarkHold() != null) { |
| assertThat(tester.getWatermarkHold(), equalTo(new Instant(watermark).plus(allowedLateness))); |
| } |
| // Nothing dropped. |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative() |
| .longValue(); |
| assertEquals(0, droppedElements); |
| } |
| |
| /** Make sure that if data comes in too late to make it on time, the hold is the GC time. */ |
| @Test |
| public void dontSetHoldIfTooLateForEndOfWindowTimer() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(10), |
| ClosingBehavior.FIRE_ALWAYS); |
| tester.setAutoAdvanceOutputWatermark(false); |
| |
| // Case: Unobservably "late" relative to input watermark, but on time for output watermark |
| tester.advanceInputWatermark(new Instant(15)); |
| tester.advanceOutputWatermark(new Instant(11)); |
| |
| IntervalWindow expectedWindow = new IntervalWindow(new Instant(10), new Instant(20)); |
| injectElement(tester, 14); |
| // Hold was applied, waiting for end-of-window timer. |
| assertEquals(new Instant(14), tester.getWatermarkHold()); |
| |
| // Trigger the end-of-window timer, fire a timer as though the mock trigger set it |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.advanceInputWatermark(new Instant(20)); |
| tester.fireTimer(expectedWindow, expectedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| // Hold has been replaced with garbage collection hold. Waiting for garbage collection. |
| assertEquals(new Instant(29), tester.getWatermarkHold()); |
| assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME)); |
| |
| // Case: Maybe late 1 |
| injectElement(tester, 13); |
| // No change to hold or timers. |
| assertEquals(new Instant(29), tester.getWatermarkHold()); |
| assertEquals(new Instant(29), tester.getNextTimer(TimeDomain.EVENT_TIME)); |
| |
| // Trigger the garbage collection timer. |
| tester.advanceInputWatermark(new Instant(30)); |
| |
| // Everything should be cleaned up. |
| assertFalse(tester.isMarkedFinished(new IntervalWindow(new Instant(10), new Instant(20)))); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(); |
| } |
| |
| @Test |
| public void testPaneInfoAllStates() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(10)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 1); |
| assertThat( |
| tester.extractOutput(), |
| contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY)))); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 2); |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)))); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| tester.setAutoAdvanceOutputWatermark(false); |
| tester.advanceInputWatermark(new Instant(15)); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| injectElement(tester, 3); |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, false, Timing.ON_TIME, 2, 0)))); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.setAutoAdvanceOutputWatermark(true); |
| injectElement(tester, 4); |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, false, Timing.LATE, 3, 1)))); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 5); |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 4, 2)))); |
| } |
| |
| @Test |
| public void testPaneInfoAllStatesAfterWatermark() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterFirst.of( |
| AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)))); |
| assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10))); |
| |
| tester.advanceInputWatermark(new Instant(50)); |
| |
| // We should get the ON_TIME pane even though it is empty, |
| // because we have an AfterWatermark.pastEndOfWindow() trigger. |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)))); |
| assertThat(output, contains(isSingleWindowedValue(emptyIterable(), 9, 0, 10))); |
| |
| // We should get the final pane even though it is empty. |
| tester.advanceInputWatermark(new Instant(150)); |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1)))); |
| assertThat(output, contains(isSingleWindowedValue(emptyIterable(), 9, 0, 10))); |
| } |
| |
| @Test |
| public void noEmptyPanesFinalIfNonEmpty() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterFirst.of( |
| AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| tester.advanceInputWatermark(new Instant(20)); |
| tester.advanceInputWatermark(new Instant(250)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| // Trigger with 2 elements |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), |
| // Trigger for the empty on time pane |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); |
| } |
| |
| @Test |
| public void noEmptyPanesFinalAlways() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterFirst.of( |
| AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| tester.advanceInputWatermark(new Instant(20)); |
| tester.advanceInputWatermark(new Instant(250)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| // Trigger with 2 elements |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), |
| // Trigger for the empty on time pane |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10), |
| // Trigger for the final pane |
| isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); |
| } |
| |
| /** |
| * If the trigger does not care about the watermark, the ReduceFnRunner should still emit an |
| * element for the ON_TIME pane. |
| */ |
| @Test |
| public void testNoWatermarkTriggerNoHold() throws Exception { |
| Duration allowedLateness = Duration.standardDays(1); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(Duration.standardSeconds(5)))) |
| .withAllowedLateness(allowedLateness)); |
| |
| // First, an element comes in on time in [0, 10) but ReduceFnRunner should |
| // not set a hold or timer for 9. That is the trigger's job. |
| IntervalWindow expectedWindow = new IntervalWindow(new Instant(0), new Instant(10)); |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| tester.injectElements(TimestampedValue.of(1, new Instant(1))); |
| |
| // Since some data arrived, the element hold will be the end of the window. |
| assertThat(tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp())); |
| |
| tester.advanceProcessingTime(new Instant(6000)); |
| |
| // Sanity check; we aren't trying to verify output in this test |
| assertThat(tester.getOutputSize(), equalTo(1)); |
| |
| // Since we did not request empty final panes, no hold |
| assertThat(tester.getWatermarkHold(), nullValue()); |
| |
| // So when the input watermark advanced, the output advances with it (automated by tester) |
| tester.advanceInputWatermark( |
| new Instant(expectedWindow.maxTimestamp().plus(Duration.standardHours(1)))); |
| |
| // Now late data arrives |
| tester.injectElements(TimestampedValue.of(3, new Instant(3))); |
| |
| // The ReduceFnRunner should set a GC hold since the element was too late and its timestamp |
| // will be ignored for the purposes of the watermark hold |
| assertThat( |
| tester.getWatermarkHold(), equalTo(expectedWindow.maxTimestamp().plus(allowedLateness))); |
| } |
| |
| @Test |
| public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterFirst.of( |
| AfterPane.elementCountAtLeast(2), AfterWatermark.pastEndOfWindow()))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)))); |
| assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10))); |
| |
| tester.advanceInputWatermark(new Instant(50)); |
| |
| // We should get the ON_TIME pane even though it is empty, |
| // because we have an AfterWatermark.pastEndOfWindow() trigger. |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0)))); |
| assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); |
| |
| // We should get the final pane even though it is empty. |
| tester.advanceInputWatermark(new Instant(150)); |
| output = tester.extractOutput(); |
| assertThat( |
| output, |
| contains( |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 2, 1)))); |
| assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); |
| } |
| |
| @Test |
| public void testPaneInfoFinalAndOnTime() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| Repeatedly.forever(AfterPane.elementCountAtLeast(2)) |
| .orFinally(AfterWatermark.pastEndOfWindow())) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| |
| // Should trigger due to element count |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(true, false, Timing.EARLY, 0, -1)))); |
| |
| tester.advanceInputWatermark(new Instant(150)); |
| assertThat( |
| tester.extractOutput(), |
| contains( |
| WindowMatchers.valueWithPaneInfo( |
| PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0)))); |
| } |
| |
| @Test |
| public void testPaneInfoSkipToFinish() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 1); |
| assertThat( |
| tester.extractOutput(), |
| contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.EARLY)))); |
| } |
| |
| @Test |
| public void testPaneInfoSkipToNonSpeculativeAndFinish() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| tester.advanceInputWatermark(new Instant(15)); |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 1); |
| assertThat( |
| tester.extractOutput(), |
| contains(WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, true, Timing.LATE)))); |
| } |
| |
| @Test |
| public void testMergeBeforeFinalizing() throws Exception { |
| // Verify that we merge windows before producing output so users don't see undesired |
| // unmerged windows. |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.ZERO, |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // All on time data, verify watermark hold. |
| // These two windows should pre-merge immediately to [1, 20) |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), // in [1, 11) |
| TimestampedValue.of(10, new Instant(10))); // in [10, 20) |
| |
| // And this should fire the end-of-window timer |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(1)); |
| assertThat( |
| output.get(0), |
| isSingleWindowedValue( |
| containsInAnyOrder(1, 10), |
| 1, // timestamp |
| 1, // window start |
| 20)); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); |
| } |
| |
| /** |
| * It is possible for a session window's trigger to be closed at the point at which the (merged) |
| * session window is garbage collected. Make sure we don't accidentally assume the window is still |
| * active. |
| */ |
| @Test |
| public void testMergingWithCloseBeforeGC() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Two elements in two overlapping session windows. |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), // in [1, 11) |
| TimestampedValue.of(10, new Instant(10))); // in [10, 20) |
| |
| // Close the trigger, but the gargbage collection timer is still pending. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| tester.advanceInputWatermark(new Instant(30)); |
| |
| // Now the garbage collection timer will fire, finding the trigger already closed. |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(1)); |
| assertThat( |
| output.get(0), |
| isSingleWindowedValue( |
| containsInAnyOrder(1, 10), |
| 1, // timestamp |
| 1, // window start |
| 20)); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); |
| } |
| |
| /** Ensure a closed trigger has its state recorded in the merge result window. */ |
| @Test |
| public void testMergingWithCloseTrigger() throws Exception { |
| Duration allowedLateness = Duration.millis(50); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| allowedLateness, |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Create a new merged session window. |
| IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(12)); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| |
| // Force the trigger to be closed for the merged window. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| |
| // Fire and end-of-window timer as though the trigger set it |
| tester.advanceInputWatermark(new Instant(13)); |
| tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); |
| |
| // Trigger is now closed. |
| assertTrue(tester.isMarkedFinished(mergedWindow)); |
| |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| |
| // Revisit the same session window. |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), TimestampedValue.of(2, new Instant(2))); |
| |
| // Trigger is still closed. |
| assertTrue(tester.isMarkedFinished(mergedWindow)); |
| } |
| |
| /** |
| * If a later event tries to reuse an earlier session window which has been closed, we should |
| * reject that element and not fail due to the window no longer being active. |
| */ |
| @Test |
| public void testMergingWithReusedWindow() throws Exception { |
| Duration allowedLateness = Duration.millis(50); |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| allowedLateness, |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| IntervalWindow mergedWindow = new IntervalWindow(new Instant(1), new Instant(11)); |
| |
| // One elements in one session window. |
| tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. |
| |
| // Close the trigger, but the gargbage collection timer is still pending. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| tester.advanceInputWatermark(new Instant(15)); |
| tester.fireTimer(mergedWindow, mergedWindow.maxTimestamp(), TimeDomain.EVENT_TIME); |
| |
| // Another element in the same session window. |
| // Should be discarded with 'window closed'. |
| tester.injectElements(TimestampedValue.of(1, new Instant(1))); // in [1, 11), gc at 21. |
| |
| // And nothing should be left in the active window state. |
| assertTrue(tester.hasNoActiveWindows()); |
| |
| // Now the garbage collection timer will fire, finding the trigger already closed. |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(1)); |
| assertThat( |
| output.get(0), |
| isSingleWindowedValue( |
| containsInAnyOrder(1), |
| equalTo(new Instant(1)), // timestamp |
| equalTo((BoundedWindow) mergedWindow))); |
| |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); |
| } |
| |
| /** |
| * When a merged window's trigger is closed we record that state using the merged window rather |
| * than the original windows. |
| */ |
| @Test |
| public void testMergingWithClosedRepresentative() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // 2 elements into merged session window. |
| // Close the trigger, but the garbage collection timer is still pending. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21. |
| TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28. |
| |
| // More elements into the same merged session window. |
| // It has not yet been gced. |
| // Should be discarded with 'window closed'. |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), // in [1, 11), gc at 21. |
| TimestampedValue.of(2, new Instant(2)), // in [2, 12), gc at 22. |
| TimestampedValue.of(8, new Instant(8))); // in [8, 18), gc at 28. |
| |
| // Now the garbage collection timer will fire, finding the trigger already closed. |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| |
| assertThat(output.size(), equalTo(1)); |
| assertThat( |
| output.get(0), |
| isSingleWindowedValue( |
| containsInAnyOrder(1, 8), |
| 1, // timestamp |
| 1, // window start |
| 18)); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); |
| } |
| |
| /** |
| * If an element for a closed session window ends up being merged into other still-open session |
| * windows, the resulting session window is not 'poisoned'. |
| */ |
| @Test |
| public void testMergingWithClosedDoesNotPoison() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| Sessions.withGapDuration(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(50), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // 1 element, force its trigger to close. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| tester.injectElements(TimestampedValue.of(2, new Instant(2))); |
| |
| // 3 elements, one already closed. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(false); |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), |
| TimestampedValue.of(2, new Instant(2)), |
| TimestampedValue.of(3, new Instant(3))); |
| |
| tester.advanceInputWatermark(new Instant(100)); |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(2)); |
| assertThat( |
| output.get(0), |
| isSingleWindowedValue( |
| containsInAnyOrder(2), |
| 2, // timestamp |
| 2, // window start |
| 12)); // window end |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); |
| assertThat( |
| output.get(1), |
| isSingleWindowedValue( |
| containsInAnyOrder(1, 2, 3), |
| 1, // timestamp |
| 1, // window start |
| 13)); // window end |
| assertThat( |
| output.get(1).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.ON_TIME, 0, 0))); |
| } |
| |
| /** |
| * Tests that when data is assigned to multiple windows but some of those windows have had their |
| * triggers finish, then the data is dropped and counted accurately. |
| */ |
| @Test |
| public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining( |
| WindowingStrategy.of(SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) |
| .withTrigger(AfterWatermark.pastEndOfWindow()) |
| .withAllowedLateness(Duration.millis(1000)), |
| Sum.ofIntegers(), |
| VarIntCoder.of()); |
| |
| tester.injectElements( |
| // assigned to [-60, 40), [-30, 70), [0, 100) |
| TimestampedValue.of(10, new Instant(23)), |
| // assigned to [-30, 70), [0, 100), [30, 130) |
| TimestampedValue.of(12, new Instant(40))); |
| |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(0, droppedElements); |
| |
| tester.advanceInputWatermark(new Instant(70)); |
| tester.injectElements( |
| // assigned to [-30, 70), [0, 100), [30, 130) |
| // but [-30, 70) is closed by the trigger |
| TimestampedValue.of(14, new Instant(60))); |
| |
| droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(1, droppedElements); |
| |
| tester.advanceInputWatermark(new Instant(130)); |
| // assigned to [-30, 70), [0, 100), [30, 130) |
| // but they are all closed |
| tester.injectElements(TimestampedValue.of(16, new Instant(40))); |
| |
| droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(4, droppedElements); |
| } |
| |
| @Test |
| public void testIdempotentEmptyPanesDiscarding() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| // Test uninteresting (empty) panes don't increment the index or otherwise |
| // modify PaneInfo. |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.DISCARDING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Inject a couple of on-time elements and fire at the window end. |
| injectElement(tester, 1); |
| injectElement(tester, 2); |
| tester.advanceInputWatermark(new Instant(12)); |
| |
| // Fire the on-time pane |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); |
| |
| // Fire another timer (with no data, so it's an uninteresting pane that should not be output). |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); |
| |
| // Finish it off with another datum. |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 3); |
| |
| // The intermediate trigger firing shouldn't result in any output. |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(2)); |
| |
| // The on-time pane is as expected. |
| assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)); |
| |
| // The late pane has the correct indices. |
| assertThat(output.get(1).getValue(), contains(3)); |
| assertThat( |
| output.get(1).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); |
| |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(0, droppedElements); |
| } |
| |
| @Test |
| public void testIdempotentEmptyPanesAccumulating() throws Exception { |
| MetricsContainerImpl container = new MetricsContainerImpl("any"); |
| MetricsEnvironment.setCurrentContainer(container); |
| // Test uninteresting (empty) panes don't increment the index or otherwise |
| // modify PaneInfo. |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| FixedWindows.of(Duration.millis(10)), |
| mockTriggerStateMachine, |
| AccumulationMode.ACCUMULATING_FIRED_PANES, |
| Duration.millis(100), |
| ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| // Inject a couple of on-time elements and fire at the window end. |
| injectElement(tester, 1); |
| injectElement(tester, 2); |
| tester.advanceInputWatermark(new Instant(12)); |
| |
| // Trigger the on-time pane |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(1)); |
| assertThat(output.get(0), isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10)); |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); |
| |
| // Fire another timer with no data; the empty pane should not be output even though the |
| // trigger is ready to fire |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| tester.fireTimer(firstWindow, new Instant(9), TimeDomain.EVENT_TIME); |
| assertThat(tester.extractOutput().size(), equalTo(0)); |
| |
| // Finish it off with another datum, which is late |
| when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); |
| triggerShouldFinish(mockTriggerStateMachine); |
| injectElement(tester, 3); |
| output = tester.extractOutput(); |
| assertThat(output.size(), equalTo(1)); |
| |
| // The late pane has the correct indices. |
| assertThat(output.get(0).getValue(), containsInAnyOrder(1, 2, 3)); |
| assertThat( |
| output.get(0).getPane(), equalTo(PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); |
| |
| assertTrue(tester.isMarkedFinished(firstWindow)); |
| tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); |
| |
| long droppedElements = |
| container |
| .getCounter( |
| MetricName.named(ReduceFnRunner.class, ReduceFnRunner.DROPPED_DUE_TO_CLOSED_WINDOW)) |
| .getCumulative(); |
| assertEquals(0, droppedElements); |
| } |
| |
| /** |
| * Test that we receive an empty on-time pane when an or-finally waiting for the watermark fires. |
| * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late |
| * when the on-time pane is empty. |
| */ |
| @Test |
| public void testEmptyOnTimeFromOrFinally() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger( |
| AfterEach.inOrder( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(5))) |
| .orFinally(AfterWatermark.pastEndOfWindow()), |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(25))))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| // Processing time timer for 5 |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), |
| TimestampedValue.of(1, new Instant(3)), |
| TimestampedValue.of(1, new Instant(7)), |
| TimestampedValue.of(1, new Instant(5))); |
| |
| // Should fire early pane |
| tester.advanceProcessingTime(new Instant(6)); |
| |
| // Should fire empty on time pane |
| tester.advanceInputWatermark(new Instant(11)); |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| |
| assertThat(output.get(0), isSingleWindowedValue(4, 1, 0, 10)); |
| assertThat(output.get(1), isSingleWindowedValue(4, 9, 0, 10)); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))); |
| } |
| |
| /** Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY. */ |
| @Test |
| public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmpty() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger( |
| AfterEach.inOrder( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(5))) |
| .orFinally(AfterWatermark.pastEndOfWindow()), |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(25))))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS) |
| .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| // Processing time timer for 5 |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), |
| TimestampedValue.of(1, new Instant(3)), |
| TimestampedValue.of(1, new Instant(7)), |
| TimestampedValue.of(1, new Instant(5))); |
| |
| // Should fire early pane |
| tester.advanceProcessingTime(new Instant(6)); |
| |
| // Should not fire empty on time pane |
| tester.advanceInputWatermark(new Instant(11)); |
| |
| // Should fire final GC pane |
| tester.advanceInputWatermark(new Instant(10 + 100)); |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| |
| assertThat(output.get(0), isSingleWindowedValue(4, 1, 0, 10)); |
| assertThat(output.get(1), isSingleWindowedValue(4, 9, 0, 10)); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 1, 0))); |
| } |
| |
| /** |
| * Test that it fires an empty on-time isFinished pane when OnTimeBehavior is FIRE_ALWAYS and |
| * ClosingBehavior is FIRE_IF_NON_EMPTY. |
| * |
| * <p>This is a test just for backward compatibility. |
| */ |
| @Test |
| public void testEmptyOnTimeWithOnTimeBehaviorBackwardCompatibility() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger( |
| AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.ZERO) |
| .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| tester.injectElements(TimestampedValue.of(1, new Instant(1))); |
| |
| // Should fire empty on time isFinished pane |
| tester.advanceInputWatermark(new Instant(11)); |
| |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.ON_TIME, 1, 0))); |
| } |
| |
| /** |
| * Test that it won't fire an empty on-time pane when OnTimeBehavior is FIRE_IF_NON_EMPTY and when |
| * receiving late data. |
| */ |
| @Test |
| public void testEmptyOnTimeWithOnTimeBehaviorFireIfNonEmptyAndLateData() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger( |
| AfterEach.inOrder( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(5))) |
| .orFinally(AfterWatermark.pastEndOfWindow()), |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(25))))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| // Processing time timer for 5 |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), |
| TimestampedValue.of(1, new Instant(3)), |
| TimestampedValue.of(1, new Instant(7)), |
| TimestampedValue.of(1, new Instant(5))); |
| |
| // Should fire early pane |
| tester.advanceProcessingTime(new Instant(6)); |
| |
| // Should not fire empty on time pane |
| tester.advanceInputWatermark(new Instant(11)); |
| |
| // Processing late data, and should fire late pane |
| tester.injectElements(TimestampedValue.of(1, new Instant(9))); |
| tester.advanceProcessingTime(new Instant(6 + 25 + 1)); |
| |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| |
| assertThat(output.get(0), isSingleWindowedValue(4, 1, 0, 10)); |
| assertThat(output.get(1), isSingleWindowedValue(5, 9, 0, 10)); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 1, 0))); |
| } |
| |
| /** |
| * Tests for processing time firings after the watermark passes the end of the window. |
| * Specifically, verify the proper triggerings and pane-info of a typical speculative/on-time/late |
| * when the on-time pane is non-empty. |
| */ |
| @Test |
| public void testProcessingTime() throws Exception { |
| WindowingStrategy<?, IntervalWindow> strategy = |
| WindowingStrategy.of((WindowFn<?, IntervalWindow>) FixedWindows.of(Duration.millis(10))) |
| .withTimestampCombiner(TimestampCombiner.EARLIEST) |
| .withTrigger( |
| AfterEach.inOrder( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(5))) |
| .orFinally(AfterWatermark.pastEndOfWindow()), |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(new Duration(25))))) |
| .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)); |
| |
| ReduceFnTester<Integer, Integer, IntervalWindow> tester = |
| ReduceFnTester.combining(strategy, Sum.ofIntegers(), VarIntCoder.of()); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceProcessingTime(new Instant(0)); |
| |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(1)), |
| TimestampedValue.of(1, new Instant(3)), |
| TimestampedValue.of(1, new Instant(7)), |
| TimestampedValue.of(1, new Instant(5))); |
| // 4 elements all at processing time 0 |
| |
| tester.advanceProcessingTime(new Instant(6)); // fire [1,3,7,5] since 6 > 0 + 5 |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(8)), TimestampedValue.of(1, new Instant(4))); |
| // 6 elements |
| |
| tester.advanceInputWatermark(new Instant(11)); // fire [1,3,7,5,8,4] since 11 > 9 |
| tester.injectElements( |
| TimestampedValue.of(1, new Instant(8)), |
| TimestampedValue.of(1, new Instant(4)), |
| TimestampedValue.of(1, new Instant(5))); |
| // 9 elements |
| |
| tester.advanceInputWatermark(new Instant(12)); |
| tester.injectElements(TimestampedValue.of(1, new Instant(3))); |
| // 10 elements |
| |
| tester.advanceProcessingTime(new Instant(15)); |
| tester.injectElements(TimestampedValue.of(1, new Instant(5))); |
| // 11 elements |
| tester.advanceProcessingTime(new Instant(32)); // fire since 32 > 6 + 25 |
| |
| tester.injectElements(TimestampedValue.of(1, new Instant(3))); |
| // 12 elements |
| // fire [1,3,7,5,8,4,8,4,5,3,5,3] since 125 > 6 + 25 |
| tester.advanceInputWatermark(new Instant(125)); |
| |
| List<WindowedValue<Integer>> output = tester.extractOutput(); |
| assertEquals(4, output.size()); |
| |
| assertThat(output.get(0), isSingleWindowedValue(4, 1, 0, 10)); |
| assertThat(output.get(1), isSingleWindowedValue(6, 4, 0, 10)); |
| assertThat(output.get(2), isSingleWindowedValue(11, 9, 0, 10)); |
| assertThat(output.get(3), isSingleWindowedValue(12, 9, 0, 10)); |
| |
| assertThat( |
| output.get(0), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(true, false, Timing.EARLY, 0, -1))); |
| assertThat( |
| output.get(1), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.ON_TIME, 1, 0))); |
| assertThat( |
| output.get(2), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, false, Timing.LATE, 2, 1))); |
| assertThat( |
| output.get(3), |
| WindowMatchers.valueWithPaneInfo(PaneInfo.createPane(false, true, Timing.LATE, 3, 2))); |
| } |
| |
| /** |
| * We should fire a non-empty ON_TIME pane in the GlobalWindow when the watermark moves to |
| * end-of-time. |
| */ |
| @Test |
| public void fireNonEmptyOnDrainInGlobalWindow() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(new GlobalWindows()) |
| .withTrigger(Repeatedly.forever(AfterPane.elementCountAtLeast(3))) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| |
| final int n = 20; |
| for (int i = 0; i < n; i++) { |
| tester.injectElements(TimestampedValue.of(i, new Instant(i))); |
| } |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertEquals(n / 3, output.size()); |
| for (int i = 0; i < output.size(); i++) { |
| assertEquals(Timing.EARLY, output.get(i).getPane().getTiming()); |
| assertEquals(i, output.get(i).getPane().getIndex()); |
| assertEquals(3, Iterables.size(output.get(i).getValue())); |
| } |
| |
| tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); |
| |
| output = tester.extractOutput(); |
| assertEquals(1, output.size()); |
| assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming()); |
| assertEquals(n / 3, output.get(0).getPane().getIndex()); |
| assertEquals(n - ((n / 3) * 3), Iterables.size(output.get(0).getValue())); |
| } |
| |
| /** |
| * We should fire an empty ON_TIME pane in the GlobalWindow when the watermark moves to |
| * end-of-time. |
| */ |
| @Test |
| public void fireEmptyOnDrainInGlobalWindowIfRequested() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, GlobalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(new GlobalWindows()) |
| .withTrigger( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane().plusDelayOf(new Duration(3)))) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES)); |
| |
| final int n = 20; |
| for (int i = 0; i < n; i++) { |
| tester.advanceProcessingTime(new Instant(i)); |
| tester.injectElements(TimestampedValue.of(i, new Instant(i))); |
| } |
| tester.advanceProcessingTime(new Instant(n + 4)); |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertEquals((n + 3) / 4, output.size()); |
| for (int i = 0; i < output.size(); i++) { |
| assertEquals(Timing.EARLY, output.get(i).getPane().getTiming()); |
| assertEquals(i, output.get(i).getPane().getIndex()); |
| assertEquals(4, Iterables.size(output.get(i).getValue())); |
| } |
| |
| tester.advanceInputWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE); |
| |
| output = tester.extractOutput(); |
| assertEquals(1, output.size()); |
| assertEquals(Timing.ON_TIME, output.get(0).getPane().getTiming()); |
| assertEquals((n + 3) / 4, output.get(0).getPane().getIndex()); |
| assertEquals(0, Iterables.size(output.get(0).getValue())); |
| } |
| |
| /** |
| * Late elements should still have a garbage collection hold set so that they can make a late pane |
| * rather than be dropped due to lateness. |
| */ |
| @Test |
| public void setGarbageCollectionHoldOnLateElements() throws Exception { |
| ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester = |
| ReduceFnTester.nonCombining( |
| WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) |
| .withTrigger( |
| AfterWatermark.pastEndOfWindow() |
| .withLateFirings(AfterPane.elementCountAtLeast(2))) |
| .withMode(AccumulationMode.DISCARDING_FIRED_PANES) |
| .withAllowedLateness(Duration.millis(100)) |
| .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); |
| |
| tester.advanceInputWatermark(new Instant(0)); |
| tester.advanceOutputWatermark(new Instant(0)); |
| tester.injectElements(TimestampedValue.of(1, new Instant(1))); |
| |
| // Fire ON_TIME pane @ 9 with 1 |
| |
| tester.advanceInputWatermark(new Instant(109)); |
| tester.advanceOutputWatermark(new Instant(109)); |
| tester.injectElements(TimestampedValue.of(2, new Instant(2))); |
| // We should have set a garbage collection hold for the final pane. |
| Instant hold = tester.getWatermarkHold(); |
| assertEquals(new Instant(109), hold); |
| |
| tester.advanceInputWatermark(new Instant(110)); |
| tester.advanceOutputWatermark(new Instant(110)); |
| |
| // Fire final LATE pane @ 9 with 2 |
| |
| List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput(); |
| assertEquals(2, output.size()); |
| } |
| |
| private static class SumAndVerifyContextFn |
| extends CombineFnWithContext<Integer, Integer, Integer> { |
| |
| private final PCollectionView<Integer> view; |
| private final int expectedValue; |
| |
| private SumAndVerifyContextFn(PCollectionView<Integer> view, int expectedValue) { |
| this.view = view; |
| this.expectedValue = expectedValue; |
| } |
| |
| private void verifyContext(Context c) { |
| assertThat(expectedValue, equalTo(c.getPipelineOptions().as(TestOptions.class).getValue())); |
| assertThat(c.sideInput(view), greaterThanOrEqualTo(100)); |
| } |
| |
| @Override |
| public Integer createAccumulator(Context c) { |
| verifyContext(c); |
| return 0; |
| } |
| |
| @Override |
| public Integer addInput(Integer accumulator, Integer input, Context c) { |
| verifyContext(c); |
| return accumulator + input; |
| } |
| |
| @Override |
| public Integer mergeAccumulators(Iterable<Integer> accumulators, Context c) { |
| verifyContext(c); |
| int res = 0; |
| for (Integer accum : accumulators) { |
| res += accum; |
| } |
| return res; |
| } |
| |
| @Override |
| public Integer extractOutput(Integer accumulator, Context c) { |
| verifyContext(c); |
| return accumulator + c.sideInput(view); |
| } |
| } |
| |
| /** A {@link PipelineOptions} to test combining with context. */ |
| public interface TestOptions extends PipelineOptions { |
| |
| int getValue(); |
| |
| void setValue(int value); |
| } |
| } |