blob: a76176d5c41f45ae512307bae1869eab662bfc97 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Ordering;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
/** A utility class for testing {@link WindowFn}s. */
public class WindowFnTestUtils {
/** Creates a Set of elements to be used as expected output in {@link #runWindowFn}. */
public static Set<String> set(long... timestamps) {
Set<String> result = new HashSet<>();
for (long timestamp : timestamps) {
result.add(timestampValue(timestamp));
}
return result;
}
/**
* Runs the {@link WindowFn} over the provided input, returning a map of windows to the timestamps
* in those windows.
*/
public static <T, W extends BoundedWindow> Map<W, Set<String>> runWindowFn(
WindowFn<T, W> windowFn, List<Long> timestamps) throws Exception {
List<TimestampedValue<T>> timestampedValues = new ArrayList<>();
for (Long timestamp : timestamps) {
timestampedValues.add(TimestampedValue.of((T) null, new Instant(timestamp)));
}
return runWindowFnWithValue(windowFn, timestampedValues);
}
/**
* Runs the {@link WindowFn} over the provided input, returning a map of windows to the timestamps
* in those windows. This version allows to pass a list of {@link TimestampedValue} in case the
* values are used to assign windows.
*/
public static <T, W extends BoundedWindow> Map<W, Set<String>> runWindowFnWithValue(
WindowFn<T, W> windowFn, List<TimestampedValue<T>> timestampedValues) throws Exception {
final TestWindowSet<W, String> windowSet = new TestWindowSet<>();
for (final TimestampedValue<T> element : timestampedValues) {
for (W window : assignedWindowsWithValue(windowFn, element)) {
windowSet.put(window, timestampValue(element.getTimestamp().getMillis()));
}
TestMergeContext<T, W> mergeContext = new TestMergeContext<>(windowSet, windowFn);
windowFn.mergeWindows(mergeContext);
}
Map<W, Set<String>> actual = new HashMap<>();
for (W window : windowSet.windows()) {
actual.put(window, windowSet.get(window));
}
return actual;
}
/** runs {@link WindowFn#assignWindows(WindowFn.AssignContext)}. */
public static <T, W extends BoundedWindow> Collection<W> assignedWindows(
WindowFn<T, W> windowFn, long timestamp) throws Exception {
return assignedWindowsWithValue(
windowFn, TimestampedValue.of((T) null, new Instant(timestamp)));
}
/**
* runs {@link WindowFn#assignWindows(WindowFn.AssignContext)}. This version allows passing a
* {@link TimestampedValue} in case the value is needed to assign windows.
*/
public static <T, W extends BoundedWindow> Collection<W> assignedWindowsWithValue(
WindowFn<T, W> windowFn, TimestampedValue<T> timestampedValue) throws Exception {
TestAssignContext<T, W> assignContext = new TestAssignContext<>(timestampedValue, windowFn);
return windowFn.assignWindows(assignContext);
}
private static String timestampValue(long timestamp) {
return "T" + new Instant(timestamp);
}
/** Test implementation of AssignContext. */
private static class TestAssignContext<T, W extends BoundedWindow>
extends WindowFn<T, W>.AssignContext {
private TimestampedValue<T> timestampedValue;
public TestAssignContext(TimestampedValue<T> timestampedValue, WindowFn<T, W> windowFn) {
windowFn.super();
this.timestampedValue = timestampedValue;
}
@Override
public T element() {
return timestampedValue.getValue();
}
@Override
public Instant timestamp() {
return timestampedValue.getTimestamp();
}
@Override
public BoundedWindow window() {
return GlobalWindow.INSTANCE;
}
}
/** Test implementation of MergeContext. */
private static class TestMergeContext<T, W extends BoundedWindow>
extends WindowFn<T, W>.MergeContext {
private TestWindowSet<W, ?> windowSet;
public TestMergeContext(TestWindowSet<W, ?> windowSet, WindowFn<T, W> windowFn) {
windowFn.super();
this.windowSet = windowSet;
}
@Override
public Collection<W> windows() {
return windowSet.windows();
}
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) {
windowSet.merge(toBeMerged, mergeResult);
}
}
/**
* A WindowSet useful for testing WindowFns that simply collects the placed elements into
* multisets.
*/
private static class TestWindowSet<W extends BoundedWindow, V> {
private Map<W, Set<V>> elements = new HashMap<>();
public void put(W window, V value) {
Set<V> all = elements.computeIfAbsent(window, k -> new HashSet<>());
all.add(value);
}
public void merge(Collection<W> otherWindows, W window) {
if (otherWindows.isEmpty()) {
return;
}
Set<V> merged = new HashSet<>();
if (elements.containsKey(window) && !otherWindows.contains(window)) {
merged.addAll(elements.get(window));
}
for (W w : otherWindows) {
if (!elements.containsKey(w)) {
throw new IllegalArgumentException("Tried to merge a non-existent window:" + w);
}
merged.addAll(elements.get(w));
elements.remove(w);
}
elements.put(window, merged);
}
public Collection<W> windows() {
return elements.keySet();
}
// For testing.
public Set<V> get(W window) {
return elements.get(window);
}
}
/**
* Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
* verifies that result of {@code windowFn.getOutputTimestamp} for each window is within the
* proper bound.
*/
public static <T, W extends BoundedWindow> void validateNonInterferingOutputTimes(
WindowFn<T, W> windowFn, long timestamp) throws Exception {
validateNonInterferingOutputTimesWithValue(
windowFn, TimestampedValue.of((T) null, new Instant(timestamp)));
}
/**
* Assigns the given {@code timestampedValue} to windows using the specified {@code windowFn}, and
* verifies that result of {@code windowFn.getOutputTimestamp} for each window is within the
* proper bound. This version allows passing a {@link TimestampedValue} in case the value is
* needed to assign windows.
*/
public static <T, W extends BoundedWindow> void validateNonInterferingOutputTimesWithValue(
WindowFn<T, W> windowFn, TimestampedValue<T> timestampedValue) throws Exception {
Collection<W> windows = assignedWindowsWithValue(windowFn, timestampedValue);
Instant instant = timestampedValue.getTimestamp();
for (W window : windows) {
Instant outputTimestamp = windowFn.getOutputTime(instant, window);
assertFalse(
"getOutputTime must be greater than or equal to input timestamp",
outputTimestamp.isBefore(instant));
assertFalse(
"getOutputTime must be less than or equal to the max timestamp",
outputTimestamp.isAfter(window.maxTimestamp()));
}
}
/**
* Assigns the given {@code timestamp} to windows using the specified {@code windowFn}, and
* verifies that result of {@link WindowFn#getOutputTime windowFn.getOutputTime} for later windows
* (as defined by {@code maxTimestamp} won't prevent the watermark from passing the end of earlier
* windows.
*
* <p>This verifies that overlapping windows don't interfere at all. Depending on the {@code
* windowFn} this may be stricter than desired.
*/
public static <T, W extends BoundedWindow> void validateGetOutputTimestamp(
WindowFn<T, W> windowFn, long timestamp) throws Exception {
validateGetOutputTimestampWithValue(
windowFn, TimestampedValue.of((T) null, new Instant(timestamp)));
}
/**
* Assigns the given {@code timestampedValue} to windows using the specified {@code windowFn}, and
* verifies that result of {@link WindowFn#getOutputTime windowFn.getOutputTime} for later windows
* (as defined by {@code maxTimestamp} won't prevent the watermark from passing the end of earlier
* windows.
*
* <p>This verifies that overlapping windows don't interfere at all. Depending on the {@code
* windowFn} this may be stricter than desired. This version allows passing a {@link
* TimestampedValue} in case the value is needed to assign windows.
*/
public static <T, W extends BoundedWindow> void validateGetOutputTimestampWithValue(
WindowFn<T, W> windowFn, TimestampedValue<T> timestampedValue) throws Exception {
Collection<W> windows = assignedWindowsWithValue(windowFn, timestampedValue);
List<W> sortedWindows = new ArrayList<>(windows);
sortedWindows.sort(Comparator.comparing(BoundedWindow::maxTimestamp));
Instant instant = timestampedValue.getTimestamp();
Instant endOfPrevious = null;
for (W window : sortedWindows) {
Instant outputTimestamp = windowFn.getOutputTime(instant, window);
if (endOfPrevious == null) {
// If this is the first window, the output timestamp can be anything, as long as it is in
// the valid range.
assertFalse(
"getOutputTime must be greater than or equal to input timestamp",
outputTimestamp.isBefore(instant));
assertFalse(
"getOutputTime must be less than or equal to the max timestamp",
outputTimestamp.isAfter(window.maxTimestamp()));
} else {
// If this is a later window, the output timestamp must be after the end of the previous
// window
assertTrue(
"getOutputTime must be greater than the end of the previous window",
outputTimestamp.isAfter(endOfPrevious));
assertFalse(
"getOutputTime must be less than or equal to the max timestamp",
outputTimestamp.isAfter(window.maxTimestamp()));
}
endOfPrevious = window.maxTimestamp();
}
}
/**
* Verifies that later-ending merged windows from any of the timestamps hold up output of
* earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
*
* <p>Given a list of lists of timestamps, where each list is expected to merge into a single
* window with end times in ascending order, assigns and merges windows for each list (as though
* each were a separate key/user session). Then combines each timestamp in the list according to
* the provided {@link TimestampCombiner}.
*
* <p>Verifies that a overlapping windows do not hold each other up via the watermark.
*/
public static <T, W extends IntervalWindow> void validateGetOutputTimestamps(
WindowFn<T, W> windowFn,
TimestampCombiner timestampCombiner,
List<List<Long>> timestampsPerWindow)
throws Exception {
List<List<TimestampedValue<T>>> timestampValuesPerWindow = new ArrayList<>();
for (List<Long> timestamps : timestampsPerWindow) {
List<TimestampedValue<T>> timestampedValues = new ArrayList<>();
for (Long timestamp : timestamps) {
TimestampedValue<T> tv = TimestampedValue.of(null, new Instant(timestamp));
timestampedValues.add(tv);
}
timestampValuesPerWindow.add(timestampedValues);
}
validateGetOutputTimestampsWithValue(windowFn, timestampCombiner, timestampValuesPerWindow);
}
/**
* Verifies that later-ending merged windows from any of the timestampValues hold up output of
* earlier-ending windows, using the provided {@link WindowFn} and {@link TimestampCombiner}.
*
* <p>Given a list of lists of timestampValues, where each list is expected to merge into a single
* window with end times in ascending order, assigns and merges windows for each list (as though
* each were a separate key/user session). Then combines each timestamp in the list according to
* the provided {@link TimestampCombiner}.
*
* <p>Verifies that a overlapping windows do not hold each other up via the watermark. This
* version allows passing {@link TimestampedValue} in case the value is needed to assign windows.
*/
public static <T, W extends IntervalWindow> void validateGetOutputTimestampsWithValue(
WindowFn<T, W> windowFn,
TimestampCombiner timestampCombiner,
List<List<TimestampedValue<T>>> timestampValuesPerWindow)
throws Exception {
// Assign windows to each timestamp, then merge them, storing the merged windows in
// a list in corresponding order to timestampValuesPerWindow
final List<W> windows = new ArrayList<>();
for (List<TimestampedValue<T>> timestampValuesForWindow : timestampValuesPerWindow) {
final Set<W> windowsToMerge = new HashSet<>();
for (TimestampedValue<T> element : timestampValuesForWindow) {
windowsToMerge.addAll(assignedWindowsWithValue(windowFn, element));
}
windowFn.mergeWindows(
windowFn.new MergeContext() {
@Override
public Collection<W> windows() {
return windowsToMerge;
}
@Override
public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception {
windows.add(mergeResult);
}
});
}
// Map every list of input timestampValues timestamps to an output timestamp
final List<Instant> combinedOutputTimestamps = new ArrayList<>();
for (int i = 0; i < timestampValuesPerWindow.size(); ++i) {
List<TimestampedValue<T>> timestampValuesForWindow = timestampValuesPerWindow.get(i);
W window = windows.get(i);
List<Instant> outputInstants = new ArrayList<>();
for (TimestampedValue<T> element : timestampValuesForWindow) {
outputInstants.add(
assignOutputTime(timestampCombiner, new Instant(element.getTimestamp()), window));
}
combinedOutputTimestamps.add(combineOutputTimes(timestampCombiner, outputInstants));
}
// Consider windows in increasing order of max timestamp; ensure the output timestamp is after
// the max timestamp of the previous
@Nullable W earlierEndingWindow = null;
for (int i = 0; i < windows.size(); ++i) {
W window = windows.get(i);
ReadableInstant outputTimestamp = combinedOutputTimestamps.get(i);
if (earlierEndingWindow != null) {
assertThat(
outputTimestamp, greaterThan((ReadableInstant) earlierEndingWindow.maxTimestamp()));
}
earlierEndingWindow = window;
}
}
private static Instant assignOutputTime(
TimestampCombiner timestampCombiner, Instant inputTimestamp, BoundedWindow window) {
switch (timestampCombiner) {
case EARLIEST:
case LATEST:
return inputTimestamp;
case END_OF_WINDOW:
return window.maxTimestamp();
default:
throw new IllegalArgumentException(
String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
}
}
private static Instant combineOutputTimes(
TimestampCombiner timestampCombiner, Iterable<Instant> outputInstants) {
checkArgument(
!Iterables.isEmpty(outputInstants),
"Cannot combine zero instants with %s",
timestampCombiner);
switch (timestampCombiner) {
case EARLIEST:
return Ordering.natural().min(outputInstants);
case LATEST:
return Ordering.natural().max(outputInstants);
case END_OF_WINDOW:
return outputInstants.iterator().next();
default:
throw new IllegalArgumentException(
String.format("Unknown %s: %s", TimestampCombiner.class, timestampCombiner));
}
}
}