blob: 6ab4c0fd3135671984649aa3360f75e7a15268df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.testing;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DurationCoder;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StructuredCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* A testing input that generates an unbounded {@link PCollection} of elements, advancing the
* watermark and processing time as elements are emitted. After all of the specified elements are
* emitted, ceases to produce output.
*
* <p>Each call to a {@link TestStream.Builder} method will only be reflected in the state of the
* {@link Pipeline} after each method before it has completed and no more progress can be made by
* the {@link Pipeline}. A {@link PipelineRunner} must ensure that no more progress can be made in
* the {@link Pipeline} before advancing the state of the {@link TestStream}.
*/
public final class TestStream<T> extends PTransform<PBegin, PCollection<T>> {
private final List<Event<T>> events;
private final Coder<T> coder;
/**
* Create a new {@link TestStream.Builder} with no elements and watermark equal to {@link
* BoundedWindow#TIMESTAMP_MIN_VALUE}.
*/
public static <T> Builder<T> create(Coder<T> coder) {
return new Builder<>(coder);
}
public static <T> Builder<T> create(
Schema schema,
SerializableFunction<T, Row> toRowFunction,
SerializableFunction<Row, T> fromRowFunction) {
return create(SchemaCoder.of(schema, toRowFunction, fromRowFunction));
}
private TestStream(Coder<T> coder, List<Event<T>> events) {
this.coder = coder;
this.events = checkNotNull(events);
}
/**
* An incomplete {@link TestStream}. Elements added to this builder will be produced in sequence
* when the pipeline created by the {@link TestStream} is run.
*/
public static class Builder<T> {
private final Coder<T> coder;
private final ImmutableList<Event<T>> events;
private final Instant currentWatermark;
private Builder(Coder<T> coder) {
this(coder, ImmutableList.of(), BoundedWindow.TIMESTAMP_MIN_VALUE);
}
private Builder(Coder<T> coder, ImmutableList<Event<T>> events, Instant currentWatermark) {
this.coder = coder;
this.events = events;
this.currentWatermark = currentWatermark;
}
/**
* Adds the specified elements to the source with timestamp equal to the current watermark.
*
* @return A {@link TestStream.Builder} like this one that will add the provided elements after
* all earlier events have completed.
*/
@SafeVarargs
public final Builder<T> addElements(T element, T... elements) {
TimestampedValue<T> firstElement = TimestampedValue.of(element, currentWatermark);
@SuppressWarnings({"unchecked", "rawtypes"})
TimestampedValue<T>[] remainingElements = new TimestampedValue[elements.length];
for (int i = 0; i < elements.length; i++) {
remainingElements[i] = TimestampedValue.of(elements[i], currentWatermark);
}
return addElements(firstElement, remainingElements);
}
/**
* Adds the specified elements to the source with the provided timestamps.
*
* @return A {@link TestStream.Builder} like this one that will add the provided elements after
* all earlier events have completed.
*/
@SafeVarargs
public final Builder<T> addElements(
TimestampedValue<T> element, TimestampedValue<T>... elements) {
checkArgument(
element.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Elements must have timestamps before %s. Got: %s",
BoundedWindow.TIMESTAMP_MAX_VALUE,
element.getTimestamp());
for (TimestampedValue<T> multiElement : elements) {
checkArgument(
multiElement.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
"Elements must have timestamps before %s. Got: %s",
BoundedWindow.TIMESTAMP_MAX_VALUE,
multiElement.getTimestamp());
}
ImmutableList<Event<T>> newEvents =
ImmutableList.<Event<T>>builder()
.addAll(events)
.add(ElementEvent.add(element, elements))
.build();
return new Builder<>(coder, newEvents, currentWatermark);
}
/**
* Advance the watermark of this source to the specified instant.
*
* <p>The watermark must advance monotonically and cannot advance to {@link
* BoundedWindow#TIMESTAMP_MAX_VALUE} or beyond.
*
* @return A {@link TestStream.Builder} like this one that will advance the watermark to the
* specified point after all earlier events have completed.
*/
public Builder<T> advanceWatermarkTo(Instant newWatermark) {
checkArgument(
!newWatermark.isBefore(currentWatermark), "The watermark must monotonically advance");
checkArgument(
newWatermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE),
"The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s",
newWatermark,
BoundedWindow.TIMESTAMP_MAX_VALUE);
ImmutableList<Event<T>> newEvents =
ImmutableList.<Event<T>>builder()
.addAll(events)
.add(WatermarkEvent.advanceTo(newWatermark))
.build();
return new Builder<>(coder, newEvents, newWatermark);
}
/**
* Advance the processing time by the specified amount.
*
* @return A {@link TestStream.Builder} like this one that will advance the processing time by
* the specified amount after all earlier events have completed.
*/
public Builder<T> advanceProcessingTime(Duration amount) {
checkArgument(
amount.getMillis() > 0,
"Must advance the processing time by a positive amount. Got: ",
amount);
ImmutableList<Event<T>> newEvents =
ImmutableList.<Event<T>>builder()
.addAll(events)
.add(ProcessingTimeEvent.advanceBy(amount))
.build();
return new Builder<>(coder, newEvents, currentWatermark);
}
/**
* Advance the watermark to infinity, completing this {@link TestStream}. Future calls to the
* same builder will not affect the returned {@link TestStream}.
*/
public TestStream<T> advanceWatermarkToInfinity() {
ImmutableList<Event<T>> newEvents =
ImmutableList.<Event<T>>builder()
.addAll(events)
.add(WatermarkEvent.advanceTo(BoundedWindow.TIMESTAMP_MAX_VALUE))
.build();
return new TestStream<>(coder, newEvents);
}
}
/**
* An event in a {@link TestStream}. A marker interface for all events that happen while
* evaluating a {@link TestStream}.
*/
public interface Event<T> {
EventType getType();
}
/** The types of {@link Event} that are supported by {@link TestStream}. */
public enum EventType {
ELEMENT,
WATERMARK,
PROCESSING_TIME
}
/** A {@link Event} that produces elements. */
@AutoValue
public abstract static class ElementEvent<T> implements Event<T> {
public abstract Iterable<TimestampedValue<T>> getElements();
@SafeVarargs
static <T> Event<T> add(TimestampedValue<T> element, TimestampedValue<T>... elements) {
return add(ImmutableList.<TimestampedValue<T>>builder().add(element).add(elements).build());
}
/** <b>For internal use only: no backwards compatibility guarantees.</b> */
@Internal
public static <T> Event<T> add(Iterable<TimestampedValue<T>> elements) {
return new AutoValue_TestStream_ElementEvent<>(EventType.ELEMENT, elements);
}
}
/** A {@link Event} that advances the watermark. */
@AutoValue
public abstract static class WatermarkEvent<T> implements Event<T> {
public abstract Instant getWatermark();
/** <b>For internal use only: no backwards compatibility guarantees.</b> */
@Internal
public static <T> Event<T> advanceTo(Instant newWatermark) {
return new AutoValue_TestStream_WatermarkEvent<>(EventType.WATERMARK, newWatermark);
}
}
/** A {@link Event} that advances the processing time clock. */
@AutoValue
public abstract static class ProcessingTimeEvent<T> implements Event<T> {
public abstract Duration getProcessingTimeAdvance();
/** <b>For internal use only: no backwards compatibility guarantees.</b> */
@Internal
public static <T> Event<T> advanceBy(Duration amount) {
return new AutoValue_TestStream_ProcessingTimeEvent<>(EventType.PROCESSING_TIME, amount);
}
}
@Override
public PCollection<T> expand(PBegin input) {
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(), WindowingStrategy.globalDefault(), IsBounded.UNBOUNDED, coder);
}
public Coder<T> getValueCoder() {
return coder;
}
/**
* Returns the sequence of {@link Event Events} in this {@link TestStream}.
*
* <p>For use by {@link PipelineRunner} authors.
*/
public List<Event<T>> getEvents() {
return events;
}
/**
* <b>For internal use only. No backwards-compatibility guarantees.</b>
*
* <p>Builder a test stream directly from events. No validation is performed on watermark
* monotonicity, etc. This is assumed to be a previously-serialized {@link TestStream} transform
* that is correct by construction.
*/
@Internal
public static <T> TestStream<T> fromRawEvents(Coder<T> coder, List<Event<T>> events) {
return new TestStream<>(coder, events);
}
@Override
public boolean equals(Object other) {
if (!(other instanceof TestStream)) {
return false;
}
TestStream<?> that = (TestStream<?>) other;
return getValueCoder().equals(that.getValueCoder()) && getEvents().equals(that.getEvents());
}
@Override
public int hashCode() {
return Objects.hash(TestStream.class, getValueCoder(), getEvents());
}
/** Coder for {@link TestStream}. */
public static class TestStreamCoder<T> extends StructuredCoder<TestStream<T>> {
private final TimestampedValue.TimestampedValueCoder<T> elementCoder;
public static <T> TestStreamCoder<T> of(Coder<T> valueCoder) {
return new TestStreamCoder<>(valueCoder);
}
private TestStreamCoder(Coder<T> valueCoder) {
this.elementCoder = TimestampedValue.TimestampedValueCoder.of(valueCoder);
}
@Override
public void encode(TestStream<T> value, OutputStream outStream) throws IOException {
List<Event<T>> events = value.getEvents();
VarIntCoder.of().encode(events.size(), outStream);
for (Event event : events) {
if (event instanceof ElementEvent) {
outStream.write(event.getType().ordinal());
Iterable<TimestampedValue<T>> elements = ((ElementEvent) event).getElements();
VarIntCoder.of().encode(Iterables.size(elements), outStream);
for (TimestampedValue<T> element : elements) {
elementCoder.encode(element, outStream);
}
} else if (event instanceof WatermarkEvent) {
outStream.write(event.getType().ordinal());
Instant watermark = ((WatermarkEvent) event).getWatermark();
InstantCoder.of().encode(watermark, outStream);
} else if (event instanceof ProcessingTimeEvent) {
outStream.write(event.getType().ordinal());
Duration processingTimeAdvance = ((ProcessingTimeEvent) event).getProcessingTimeAdvance();
DurationCoder.of().encode(processingTimeAdvance, outStream);
}
}
}
@Override
public TestStream<T> decode(InputStream inStream) throws IOException {
Integer numberOfEvents = VarIntCoder.of().decode(inStream);
List<Event<T>> events = new ArrayList<>(numberOfEvents);
for (int i = 0; i < numberOfEvents; i++) {
EventType eventType = EventType.values()[inStream.read()];
switch (eventType) {
case ELEMENT:
int numElements = VarIntCoder.of().decode(inStream);
List<TimestampedValue<T>> elements = new ArrayList<>(numElements);
for (int j = 0; j < numElements; j++) {
elements.add(elementCoder.decode(inStream));
}
events.add(ElementEvent.add(elements));
break;
case WATERMARK:
Instant watermark = InstantCoder.of().decode(inStream);
events.add(WatermarkEvent.advanceTo(watermark));
break;
case PROCESSING_TIME:
Duration duration = DurationCoder.of().decode(inStream).toDuration();
events.add(ProcessingTimeEvent.advanceBy(duration));
break;
default:
throw new IllegalStateException("Unknown event type + " + eventType);
}
}
return TestStream.fromRawEvents(elementCoder.getValueCoder(), events);
}
@Override
public List<? extends Coder<?>> getCoderArguments() {
return Collections.singletonList(elementCoder);
}
@Override
public void verifyDeterministic() throws NonDeterministicException {}
}
}