blob: 1e7ca9364a4b1e5cd69d8610c9c43420af29875e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.beam.runners.core.DoFnRunners.OutputManager;
import org.apache.beam.runners.core.TimerInternals.TimerData;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ListMultimap;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link SimpleDoFnRunner}. */
@RunWith(JUnit4.class)
public class SimpleDoFnRunnerTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Mock StepContext mockStepContext;
@Mock TimerInternals mockTimerInternals;
@Before
public void setup() {
MockitoAnnotations.initMocks(this);
when(mockStepContext.timerInternals()).thenReturn(mockTimerInternals);
}
@Test
public void testProcessElementExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
}
@Test
public void testOnTimerExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.onTimer(
ThrowingDoFn.TIMER_ID, GlobalWindow.INSTANCE, new Instant(0), TimeDomain.EVENT_TIME);
}
/**
* Tests that a users call to set a timer gets properly dispatched to the timer internals. From
* there on, it is the duty of the runner & step context to set it in whatever way is right for
* that runner.
*/
@Test
public void testTimerSet() {
WindowFn<?, ?> windowFn = new GlobalWindows();
DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
// Setting the timer needs the current time, as it is set relative
Instant currentTime = new Instant(42);
when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(currentTime);
runner.processElement(WindowedValue.valueInGlobalWindow("anyValue"));
verify(mockTimerInternals)
.setTimer(
StateNamespaces.window(new GlobalWindows().windowCoder(), GlobalWindow.INSTANCE),
DoFnWithTimers.TIMER_ID,
currentTime.plus(DoFnWithTimers.TIMER_OFFSET),
TimeDomain.EVENT_TIME);
}
@Test
public void testStartBundleExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.startBundle();
}
@Test
public void testFinishBundleExceptionsWrappedAsUserCodeException() {
ThrowingDoFn fn = new ThrowingDoFn();
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
thrown.expect(UserCodeException.class);
thrown.expectCause(is(fn.exceptionToThrow));
runner.finishBundle();
}
/**
* Tests that {@link SimpleDoFnRunner#onTimer} properly dispatches to the underlying {@link DoFn}.
*/
@Test
public void testOnTimerCalled() {
WindowFn<?, GlobalWindow> windowFn = new GlobalWindows();
DoFnWithTimers<GlobalWindow> fn = new DoFnWithTimers(windowFn.windowCoder());
DoFnRunner<String, String> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
null,
null,
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(windowFn),
DoFnSchemaInformation.create());
Instant currentTime = new Instant(42);
Duration offset = Duration.millis(37);
// Mocking is not easily compatible with annotation analysis, so we manually record
// the method call.
runner.onTimer(
DoFnWithTimers.TIMER_ID,
GlobalWindow.INSTANCE,
currentTime.plus(offset),
TimeDomain.EVENT_TIME);
assertThat(
fn.onTimerInvocations,
contains(
TimerData.of(
DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowFn.windowCoder(), GlobalWindow.INSTANCE),
currentTime.plus(offset),
TimeDomain.EVENT_TIME)));
}
/**
* Demonstrates that attempting to output an element before the timestamp of the current element
* with zero {@link DoFn#getAllowedTimestampSkew() allowed timestamp skew} throws.
*/
@Test
public void testBackwardsInTimeNoSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.ZERO);
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
runner.startBundle();
// An element output at the current timestamp is fine.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.ZERO, new Instant(0)));
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("must be no earlier");
thrown.expectMessage(
String.format("timestamp of the current input (%s)", new Instant(0).toString()));
thrown.expectMessage(
String.format(
"the allowed skew (%s)", PeriodFormat.getDefault().print(Duration.ZERO.toPeriod())));
// An element output before (current time - skew) is forbidden
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
}
/**
* Demonstrates that attempting to output an element before the timestamp of the current element
* plus the value of {@link DoFn#getAllowedTimestampSkew()} throws, but between that value and the
* current timestamp succeeds.
*/
@Test
public void testSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.standardMinutes(10L));
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
runner.startBundle();
// Outputting between "now" and "now - allowed skew" succeeds.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.standardMinutes(5L), new Instant(0)));
thrown.expect(UserCodeException.class);
thrown.expectCause(isA(IllegalArgumentException.class));
thrown.expectMessage("must be no earlier");
thrown.expectMessage(
String.format("timestamp of the current input (%s)", new Instant(0).toString()));
thrown.expectMessage(
String.format(
"the allowed skew (%s)",
PeriodFormat.getDefault().print(Duration.standardMinutes(10L).toPeriod())));
// Outputting before "now - allowed skew" fails.
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.standardHours(1L), new Instant(0)));
}
/**
* Demonstrates that attempting to output an element with a timestamp before the current one
* always succeeds when {@link DoFn#getAllowedTimestampSkew()} is equal to {@link Long#MAX_VALUE}
* milliseconds.
*/
@Test
public void testInfiniteSkew() {
SkewingDoFn fn = new SkewingDoFn(Duration.millis(Long.MAX_VALUE));
DoFnRunner<Duration, Duration> runner =
new SimpleDoFnRunner<>(
null,
fn,
NullSideInputReader.empty(),
new ListOutputManager(),
new TupleTag<>(),
Collections.emptyList(),
mockStepContext,
null,
Collections.emptyMap(),
WindowingStrategy.of(new GlobalWindows()),
DoFnSchemaInformation.create());
runner.startBundle();
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(Duration.millis(1L), new Instant(0)));
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(
Duration.millis(1L), BoundedWindow.TIMESTAMP_MIN_VALUE.plus(Duration.millis(1))));
runner.processElement(
WindowedValue.timestampedValueInGlobalWindow(
// This is the maximum amount a timestamp in beam can move (from the maximum timestamp
// to the minimum timestamp).
Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())
.minus(Duration.millis(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis())),
BoundedWindow.TIMESTAMP_MAX_VALUE));
}
static class ThrowingDoFn extends DoFn<String, String> {
final Exception exceptionToThrow = new UnsupportedOperationException("Expected exception");
static final String TIMER_ID = "throwingTimerId";
@TimerId(TIMER_ID)
private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@StartBundle
public void startBundle() throws Exception {
throw exceptionToThrow;
}
@FinishBundle
public void finishBundle() throws Exception {
throw exceptionToThrow;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
throw exceptionToThrow;
}
@OnTimer(TIMER_ID)
public void onTimer(OnTimerContext context) throws Exception {
throw exceptionToThrow;
}
}
private static class DoFnWithTimers<W extends BoundedWindow> extends DoFn<String, String> {
static final String TIMER_ID = "testTimerId";
static final Duration TIMER_OFFSET = Duration.millis(100);
private final Coder<W> windowCoder;
// Mutable
List<TimerData> onTimerInvocations;
DoFnWithTimers(Coder<W> windowCoder) {
this.windowCoder = windowCoder;
this.onTimerInvocations = new ArrayList<>();
}
@TimerId(TIMER_ID)
private static final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
timer.offset(TIMER_OFFSET).setRelative();
}
@OnTimer(TIMER_ID)
public void onTimer(OnTimerContext context) {
onTimerInvocations.add(
TimerData.of(
DoFnWithTimers.TIMER_ID,
StateNamespaces.window(windowCoder, (W) context.window()),
context.timestamp(),
context.timeDomain()));
}
}
/**
* A {@link DoFn} that outputs elements with timestamp equal to the input timestamp minus the
* input element.
*/
private static class SkewingDoFn extends DoFn<Duration, Duration> {
private final Duration allowedSkew;
private SkewingDoFn(Duration allowedSkew) {
this.allowedSkew = allowedSkew;
}
@ProcessElement
public void processElement(ProcessContext context) {
context.outputWithTimestamp(context.element(), context.timestamp().minus(context.element()));
}
@Override
public Duration getAllowedTimestampSkew() {
return allowedSkew;
}
}
private static class ListOutputManager implements OutputManager {
private ListMultimap<TupleTag<?>, WindowedValue<?>> outputs = ArrayListMultimap.create();
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> output) {
outputs.put(tag, output);
}
}
}