blob: 4e51525481263f7df6786c02c43eb2bcafb90179 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nemo.compiler.frontend.beam.transform;
import org.apache.beam.runners.core.SystemReduceFn;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.nemo.common.ir.vertex.transform.Transform;
import org.apache.nemo.common.punctuation.Watermark;
import org.apache.nemo.compiler.frontend.beam.NemoPipelineOptions;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
import static org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing.*;
import static org.apache.beam.sdk.values.WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
public final class GroupByKeyAndWindowDoFnTransformTest {
private static final Logger LOG = LoggerFactory.getLogger(GroupByKeyAndWindowDoFnTransformTest.class.getName());
private final static Coder NULL_INPUT_CODER = null;
private final static Map<TupleTag<?>, Coder<?>> NULL_OUTPUT_CODERS = null;
private void checkOutput(final KV<String, List<String>> expected, final KV<String, Iterable<String>> result) {
// check key
assertEquals(expected.getKey(), result.getKey());
// check value
final List<String> resultValue = new ArrayList<>();
final List<String> expectedValue = new ArrayList<>(expected.getValue());
result.getValue().iterator().forEachRemaining(resultValue::add);
Collections.sort(resultValue);
Collections.sort(expectedValue);
assertEquals(expectedValue, resultValue);
}
// window size: 2 sec
// interval size: 1 sec
//
// [--------------window2------------------------------]
// [----------------------- window1 --------------------------]
// [-------window0-------]
// ts1 -- ts2 -- ts3 -- w -- ts4 -- w2 -- ts5 --ts6 --ts7 -- w3 -- ts8 --ts9 - --w4
// (1, "hello")
// (1, "world")
// (2, "hello")
// ==> window1: {(1,["hello","world"]), (2, ["hello"])}
// (1, "a")
// (2,"a")
// (3,"a")
// (2,"b")
// => window2: {(1,"a"), (2,["a","b"]), (3,"a")}
@Test
@SuppressWarnings("unchecked")
public void test() {
final TupleTag<String> outputTag = new TupleTag<>("main-output");
final SlidingWindows slidingWindows = SlidingWindows.of(Duration.standardSeconds(2))
.every(Duration.standardSeconds(1));
final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
new GroupByKeyAndWindowDoFnTransform(
NULL_OUTPUT_CODERS,
outputTag,
WindowingStrategy.of(slidingWindows),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(NULL_INPUT_CODER),
DisplayData.none());
final Instant ts1 = new Instant(1);
final Instant ts2 = new Instant(100);
final Instant ts3 = new Instant(300);
final Watermark watermark = new Watermark(1003);
final Instant ts4 = new Instant(1200);
final Watermark watermark2 = new Watermark(1400);
final Instant ts5 = new Instant(1600);
final Instant ts6 = new Instant(1800);
final Instant ts7 = new Instant(1900);
final Watermark watermark3 = new Watermark(2100);
final Instant ts8 = new Instant(2200);
final Instant ts9 = new Instant(2300);
final Watermark watermark4 = new Watermark(3000);
List<IntervalWindow> sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts1));
Collections.sort(sortedWindows, IntervalWindow::compareTo);
// [0---1000)
final IntervalWindow window0 = sortedWindows.get(0);
// [0---2000)
final IntervalWindow window1 = sortedWindows.get(1);
sortedWindows.clear();
sortedWindows = new ArrayList<>(slidingWindows.assignWindows(ts4));
Collections.sort(sortedWindows, IntervalWindow::compareTo);
// [1000--3000)
final IntervalWindow window2 = sortedWindows.get(1);
final Transform.Context context = mock(Transform.Context.class);
final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
doFnTransform.prepare(context, oc);
doFnTransform.onData(WindowedValue.of(
KV.of("1", "hello"), ts1, slidingWindows.assignWindows(ts1), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
KV.of("1", "world"), ts2, slidingWindows.assignWindows(ts2), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
KV.of("2", "hello"), ts3, slidingWindows.assignWindows(ts3), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(watermark);
// output
// 1: ["hello", "world"]
// 2: ["hello"]
Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
// windowed result for key 1
assertEquals(Arrays.asList(window0), oc.outputs.get(0).getWindows());
checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
// windowed result for key 2
assertEquals(Arrays.asList(window0), oc.outputs.get(1).getWindows());
checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
assertEquals(2, oc.outputs.size());
assertEquals(1, oc.watermarks.size());
// check output watermark
assertEquals(1000,
oc.watermarks.get(0).getTimestamp());
oc.outputs.clear();
oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
KV.of("1", "a"), ts4, slidingWindows.assignWindows(ts4), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(watermark2);
assertEquals(0, oc.outputs.size()); // do not emit anything
assertEquals(1, oc.watermarks.size());
// check output watermark
assertEquals(1400,
oc.watermarks.get(0).getTimestamp());
oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
KV.of("3", "a"), ts5, slidingWindows.assignWindows(ts5), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
KV.of("3", "a"), ts6, slidingWindows.assignWindows(ts6), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
KV.of("3", "b"), ts7, slidingWindows.assignWindows(ts7), PaneInfo.NO_FIRING));
// emit window1
doFnTransform.onWatermark(watermark3);
// output
// 1: ["hello", "world", "a"]
// 2: ["hello"]
// 3: ["a", "a", "b"]
Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
// windowed result for key 1
assertEquals(Arrays.asList(window1), oc.outputs.get(0).getWindows());
checkOutput(KV.of("1", Arrays.asList("hello", "world", "a")), oc.outputs.get(0).getValue());
// windowed result for key 2
assertEquals(Arrays.asList(window1), oc.outputs.get(1).getWindows());
checkOutput(KV.of("2", Arrays.asList("hello")), oc.outputs.get(1).getValue());
// windowed result for key 3
assertEquals(Arrays.asList(window1), oc.outputs.get(2).getWindows());
checkOutput(KV.of("3", Arrays.asList("a", "a", "b")), oc.outputs.get(2).getValue());
// check output watermark
assertEquals(2000,
oc.watermarks.get(0).getTimestamp());
oc.outputs.clear();
oc.watermarks.clear();
doFnTransform.onData(WindowedValue.of(
KV.of("1", "a"), ts8, slidingWindows.assignWindows(ts8), PaneInfo.NO_FIRING));
doFnTransform.onData(WindowedValue.of(
KV.of("3", "b"), ts9, slidingWindows.assignWindows(ts9), PaneInfo.NO_FIRING));
// emit window2
doFnTransform.onWatermark(watermark4);
// output
// 1: ["a", "a"]
// 3: ["a", "a", "b", "b"]
Collections.sort(oc.outputs, (o1, o2) -> o1.getValue().getKey().compareTo(o2.getValue().getKey()));
assertEquals(2, oc.outputs.size());
// windowed result for key 1
assertEquals(Arrays.asList(window2), oc.outputs.get(0).getWindows());
checkOutput(KV.of("1", Arrays.asList("a", "a")), oc.outputs.get(0).getValue());
// windowed result for key 3
assertEquals(Arrays.asList(window2), oc.outputs.get(1).getWindows());
checkOutput(KV.of("3", Arrays.asList("a", "a", "b", "b")), oc.outputs.get(1).getValue());
// check output watermark
assertEquals(3000,
oc.watermarks.get(0).getTimestamp());
doFnTransform.close();
}
/**
* Test complex triggers that emit early and late firing.
*/
@Test
public void eventTimeTriggerTest() {
final Duration lateness = Duration.standardSeconds(1);
final AfterWatermark.AfterWatermarkEarlyAndLate trigger = AfterWatermark.pastEndOfWindow()
// early firing
.withEarlyFirings(
AfterProcessingTime
.pastFirstElementInPane()
// early firing 1 sec after receiving an element
.plusDelayOf(Duration.millis(1000)))
// late firing: Fire on any late data.
.withLateFirings(AfterPane.elementCountAtLeast(1));
final FixedWindows window = (FixedWindows) Window.into(
FixedWindows.of(Duration.standardSeconds(5)))
// lateness
.withAllowedLateness(lateness)
.triggering(trigger)
// TODO #308: Test discarding of refinements
.accumulatingFiredPanes().getWindowFn();
final TupleTag<String> outputTag = new TupleTag<>("main-output");
final GroupByKeyAndWindowDoFnTransform<String, String> doFnTransform =
new GroupByKeyAndWindowDoFnTransform(
NULL_OUTPUT_CODERS,
outputTag,
WindowingStrategy.of(window).withTrigger(trigger)
.withMode(ACCUMULATING_FIRED_PANES)
.withAllowedLateness(lateness),
PipelineOptionsFactory.as(NemoPipelineOptions.class),
SystemReduceFn.buffering(NULL_INPUT_CODER),
DisplayData.none());
final Transform.Context context = mock(Transform.Context.class);
final TestOutputCollector<KV<String, Iterable<String>>> oc = new TestOutputCollector();
doFnTransform.prepare(context, oc);
doFnTransform.onData(WindowedValue.of(
KV.of("1", "hello"), new Instant(1), window.assignWindow(new Instant(1)), PaneInfo.NO_FIRING));
// early firing is not related to the watermark progress
doFnTransform.onWatermark(new Watermark(2));
assertEquals(1, oc.outputs.size());
assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
LOG.info("Output: {}", oc.outputs.get(0));
oc.outputs.clear();
doFnTransform.onData(WindowedValue.of(
KV.of("1", "world"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
// EARLY firing... waiting >= 1 sec
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// GBKTransform emits data when receiving watermark
// TODO #250: element-wise processing
doFnTransform.onWatermark(new Watermark(5));
assertEquals(1, oc.outputs.size());
assertEquals(EARLY, oc.outputs.get(0).getPane().getTiming());
// ACCUMULATION MODE
checkOutput(KV.of("1", Arrays.asList("hello", "world")), oc.outputs.get(0).getValue());
LOG.info("Output: {}", oc.outputs.get(0));
oc.outputs.clear();
// ON TIME
doFnTransform.onData(WindowedValue.of(
KV.of("1", "!!"), new Instant(3), window.assignWindow(new Instant(3)), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(new Watermark(5001));
assertEquals(1, oc.outputs.size());
assertEquals(ON_TIME, oc.outputs.get(0).getPane().getTiming());
LOG.info("Output: {}", oc.outputs.get(0));
// ACCUMULATION MODE
checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!")), oc.outputs.get(0).getValue());
oc.outputs.clear();
// LATE DATA
// actual window: [0-5000)
// allowed lateness: 1000 (ms)
// current watermark: 5001
// data: 4500
// the data timestamp + allowed lateness > current watermark,
// so it should be accumulated to the prev window
doFnTransform.onData(WindowedValue.of(
KV.of("1", "bye!"), new Instant(4500),
window.assignWindow(new Instant(4500)), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(new Watermark(6000));
assertEquals(1, oc.outputs.size());
assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
LOG.info("Output: {}", oc.outputs.get(0));
// The data should be accumulated to the previous window because it allows 1 second lateness
checkOutput(KV.of("1", Arrays.asList("hello", "world", "!!", "bye!")), oc.outputs.get(0).getValue());
oc.outputs.clear();
// LATE DATA
// data timestamp: 4800
// current watermark: 6000
// data timestamp + allowed lateness < current watermark
// It should not be accumulated to the prev window
doFnTransform.onData(WindowedValue.of(
KV.of("1", "hello again!"), new Instant(4800),
window.assignWindow(new Instant(4800)), PaneInfo.NO_FIRING));
doFnTransform.onWatermark(new Watermark(6300));
assertEquals(1, oc.outputs.size());
assertEquals(LATE, oc.outputs.get(0).getPane().getTiming());
LOG.info("Output: {}", oc.outputs.get(0));
checkOutput(KV.of("1", Arrays.asList("hello again!")), oc.outputs.get(0).getValue());
oc.outputs.clear();
doFnTransform.close();
}
}