blob: 0ce5de8792f47b22b746d4554440eaf49a3cf852 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.TestStreamTranslation;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testing.TestStream.ElementEvent;
import org.apache.beam.sdk.testing.TestStream.Event;
import org.apache.beam.sdk.testing.TestStream.EventType;
import org.apache.beam.sdk.testing.TestStream.ProcessingTimeEvent;
import org.apache.beam.sdk.testing.TestStream.WatermarkEvent;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.joda.time.Instant;
/** The {@link TransformEvaluatorFactory} for the {@link TestStream} primitive. */
class TestStreamEvaluatorFactory implements TransformEvaluatorFactory {
private final EvaluationContext evaluationContext;
TestStreamEvaluatorFactory(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
@Nullable
@Override
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
return createEvaluator((AppliedPTransform) application);
}
@Override
public void cleanup() throws Exception {}
/**
* Returns the evaluator for the provided application of {@link TestStream}, or null if it is
* already in use.
*
* <p>The documented behavior of {@link TestStream} requires the output of one event to travel
* completely through the pipeline before any additional event, so additional instances that have
* a separate collection of events cannot be created.
*/
private <InputT, OutputT> TransformEvaluator<? super InputT> createEvaluator(
AppliedPTransform<PBegin, PCollection<OutputT>, TestStream<OutputT>> application) {
return (TransformEvaluator<InputT>) new Evaluator<>(application, evaluationContext);
}
private static class Evaluator<T> implements TransformEvaluator<TestStreamIndex<T>> {
private final AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application;
private final EvaluationContext context;
private final StepTransformResult.Builder resultBuilder;
private Evaluator(
AppliedPTransform<PBegin, PCollection<T>, TestStream<T>> application,
EvaluationContext context) {
this.application = application;
this.context = context;
this.resultBuilder = StepTransformResult.withoutHold(application);
}
@Override
public void processElement(WindowedValue<TestStreamIndex<T>> element) throws Exception {
TestStreamIndex<T> streamIndex = element.getValue();
List<Event<T>> events = streamIndex.getTestStream().getEvents();
int index = streamIndex.getIndex();
Instant watermark = element.getTimestamp();
Event<T> event = events.get(index);
if (event.getType().equals(EventType.ELEMENT)) {
UncommittedBundle<T> bundle =
context.createBundle(
(PCollection<T>) Iterables.getOnlyElement(application.getOutputs().values()));
for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) {
bundle.add(
WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp()));
}
resultBuilder.addOutput(bundle);
}
if (event.getType().equals(EventType.WATERMARK)) {
watermark = ((WatermarkEvent<T>) event).getWatermark();
}
if (event.getType().equals(EventType.PROCESSING_TIME)) {
((TestClock) context.getClock())
.advance(((ProcessingTimeEvent<T>) event).getProcessingTimeAdvance());
}
TestStreamIndex<T> next = streamIndex.next();
if (next.getIndex() < events.size()) {
resultBuilder.addUnprocessedElements(
Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(next, watermark)));
}
}
@Override
public TransformResult<TestStreamIndex<T>> finishBundle() throws Exception {
return resultBuilder.build();
}
}
@VisibleForTesting
static class TestClock implements Clock {
private final AtomicReference<Instant> currentTime =
new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);
public void advance(Duration amount) {
Instant now = currentTime.get();
currentTime.compareAndSet(now, now.plus(amount));
}
@Override
public Instant now() {
return currentTime.get();
}
}
private static class TestClockSupplier implements Supplier<Clock> {
@Override
public Clock get() {
return new TestClock();
}
}
static class DirectTestStreamFactory<T>
implements PTransformOverrideFactory<
PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> {
private final DirectRunner runner;
DirectTestStreamFactory(DirectRunner runner) {
this.runner = runner;
}
@Override
public PTransformReplacement<PBegin, PCollection<T>> getReplacementTransform(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform) {
try {
return PTransformReplacement.of(
transform.getPipeline().begin(),
new DirectTestStream<>(runner, TestStreamTranslation.getTestStream(transform)));
} catch (IOException exc) {
throw new RuntimeException(
String.format(
"Transform could not be converted to %s", TestStream.class.getSimpleName()),
exc);
}
}
@Override
public Map<PValue, ReplacementOutput> mapOutputs(
Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) {
return ReplacementOutputs.singleton(outputs, newOutput);
}
static final String DIRECT_TEST_STREAM_URN = "urn:beam:directrunner:transforms:test_stream:v1";
static class DirectTestStream<T> extends PTransform<PBegin, PCollection<T>> {
private final transient DirectRunner runner;
private final TestStream<T> original;
@VisibleForTesting
DirectTestStream(DirectRunner runner, TestStream<T> transform) {
this.runner = runner;
this.original = transform;
}
@Override
public PCollection<T> expand(PBegin input) {
runner.setClockSupplier(new TestClockSupplier());
return PCollection.createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
IsBounded.UNBOUNDED,
original.getValueCoder());
}
}
}
static class InputProvider<T> implements RootInputProvider<T, TestStreamIndex<T>, PBegin> {
private final EvaluationContext evaluationContext;
InputProvider(EvaluationContext evaluationContext) {
this.evaluationContext = evaluationContext;
}
@Override
public Collection<CommittedBundle<TestStreamIndex<T>>> getInitialInputs(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform,
int targetParallelism) {
// This will always be run on an execution-time transform, so it can be downcast
DirectTestStreamFactory.DirectTestStream<T> testStream =
(DirectTestStreamFactory.DirectTestStream<T>) transform.getTransform();
CommittedBundle<TestStreamIndex<T>> initialBundle =
evaluationContext
.<TestStreamIndex<T>>createRootBundle()
.add(WindowedValue.valueInGlobalWindow(TestStreamIndex.of(testStream.original)))
.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
return Collections.singleton(initialBundle);
}
}
@AutoValue
abstract static class TestStreamIndex<T> {
static <T> TestStreamIndex<T> of(TestStream<T> stream) {
return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(stream, 0);
}
abstract TestStream<T> getTestStream();
abstract int getIndex();
TestStreamIndex<T> next() {
return new AutoValue_TestStreamEvaluatorFactory_TestStreamIndex<>(
getTestStream(), getIndex() + 1);
}
}
}