blob: 7cc9dc08deb24a49a8d509beec61618268253ebd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables.getOnlyElement;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator.NeverDeduplicator;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.Read.Unbounded;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.joda.time.Instant;
/**
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Unbounded Read.Unbounded} primitive {@link PTransform}.
*/
class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
// Occasionally close an existing reader and resume from checkpoint, to exercise close-and-resume
private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
private final double readerReuseChance;
UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
this(evaluationContext, options, DEFAULT_READER_REUSE_CHANCE);
}
@VisibleForTesting
UnboundedReadEvaluatorFactory(
EvaluationContext evaluationContext, PipelineOptions options, double readerReuseChance) {
this.evaluationContext = evaluationContext;
this.options = options;
this.readerReuseChance = readerReuseChance;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
@Nullable
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
return createEvaluator((AppliedPTransform) application);
}
private <OutputT> TransformEvaluator<?> createEvaluator(
AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> application) {
return new UnboundedReadEvaluator<>(application, evaluationContext, options, readerReuseChance);
}
@Override
public void cleanup() {}
/**
* A {@link UnboundedReadEvaluator} produces elements from an underlying {@link UnboundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
* creates the {@link UnboundedReader} and consumes some currently available input.
*
* <p>Calls to {@link UnboundedReadEvaluator} are not internally thread-safe, and should only be
* used by a single thread at a time. Each {@link UnboundedReadEvaluator} maintains its own
* checkpoint, and constructs its reader from the current checkpoint in each call to {@link
* #finishBundle()}.
*/
private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends CheckpointMark>
implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
private static final int ARBITRARY_MAX_ELEMENTS = 10;
private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
private final double readerReuseChance;
private final StepTransformResult.Builder resultBuilder;
public UnboundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, ?> transform,
EvaluationContext evaluationContext,
PipelineOptions options,
double readerReuseChance) {
this.transform = transform;
this.evaluationContext = evaluationContext;
this.options = options;
this.readerReuseChance = readerReuseChance;
resultBuilder = StepTransformResult.withoutHold(transform);
}
@Override
@SuppressWarnings("Finally") // Cannot use try-with-resources in order to ensure we don't
// double-close the reader.
public void processElement(
WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
UncommittedBundle<OutputT> output =
evaluationContext.createBundle(
(PCollection<OutputT>) getOnlyElement(transform.getOutputs().values()));
UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue();
UnboundedReader<OutputT> reader = null;
try {
reader = getReader(shard);
boolean elementAvailable = startReader(reader, shard);
if (elementAvailable) {
UnboundedReadDeduplicator deduplicator = shard.getDeduplicator();
int numElements = 0;
do {
if (deduplicator.shouldOutput(reader.getCurrentRecordId())) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
}
numElements++;
} while (numElements < ARBITRARY_MAX_ELEMENTS && reader.advance());
Instant watermark = reader.getWatermark();
CheckpointMarkT finishedCheckpoint = finishRead(reader, watermark, shard);
// Sometimes resume from a checkpoint even if it's not required
if (ThreadLocalRandom.current().nextDouble(1.0) >= readerReuseChance) {
UnboundedReader<OutputT> toClose = reader;
// Prevent double-close. UnboundedReader is AutoCloseable, which does not require
// idempotency of close. Nulling out the reader here prevents trying to re-close it
// if the call to close throws an IOException.
reader = null;
toClose.close();
}
UnboundedSourceShard<OutputT, CheckpointMarkT> residual =
UnboundedSourceShard.of(
shard.getSource(), shard.getDeduplicator(), reader, finishedCheckpoint);
resultBuilder
.addOutput(output)
.addUnprocessedElements(
Collections.singleton(
WindowedValue.timestampedValueInGlobalWindow(residual, watermark)));
} else {
Instant watermark = reader.getWatermark();
if (watermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
// If the reader had no elements available, but the shard is not done, reuse it later
// Might be better to finalize old checkpoint.
resultBuilder.addUnprocessedElements(
Collections.<WindowedValue<?>>singleton(
WindowedValue.timestampedValueInGlobalWindow(
UnboundedSourceShard.of(
shard.getSource(),
shard.getDeduplicator(),
reader,
shard.getCheckpoint()),
watermark)));
} else {
// End of input. Close the reader after finalizing old checkpoint.
// note: can be null for empty datasets so ensure to null check the checkpoint
final CheckpointMarkT checkpoint = shard.getCheckpoint();
IOException ioe = null;
try {
if (checkpoint != null) {
checkpoint.finalizeCheckpoint();
}
} catch (final IOException finalizeCheckpointException) {
ioe = finalizeCheckpointException;
} finally {
try {
UnboundedReader<?> toClose = reader;
reader = null; // Avoid double close below in case of an exception.
toClose.close();
} catch (final IOException closeEx) {
if (ioe != null) {
ioe.addSuppressed(closeEx);
} else {
throw closeEx;
}
}
}
if (ioe != null) {
throw ioe;
}
}
}
} catch (IOException e) {
if (reader != null) {
reader.close();
}
throw e;
}
}
private UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
throws IOException {
UnboundedReader<OutputT> existing = shard.getExistingReader();
if (existing == null) {
CheckpointMarkT checkpoint = shard.getCheckpoint();
if (checkpoint != null) {
checkpoint = CoderUtils.clone(shard.getSource().getCheckpointMarkCoder(), checkpoint);
}
return shard.getSource().createReader(options, checkpoint);
} else {
return existing;
}
}
private boolean startReader(
UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
throws IOException {
if (shard.getExistingReader() == null) {
return reader.start();
} else {
return shard.getExistingReader().advance();
}
}
/**
* Checkpoint the current reader, finalize the previous checkpoint, and return the current
* checkpoint.
*/
private CheckpointMarkT finishRead(
UnboundedReader<OutputT> reader,
Instant watermark,
UnboundedSourceShard<OutputT, CheckpointMarkT> shard)
throws IOException {
final CheckpointMark oldMark = shard.getCheckpoint();
@SuppressWarnings("unchecked")
final CheckpointMarkT mark = (CheckpointMarkT) reader.getCheckpointMark();
if (oldMark != null) {
oldMark.finalizeCheckpoint();
}
// If the watermark is the max value, this source may not be invoked again. Finalize after
// committing the output.
if (!watermark.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
PCollection<OutputT> outputPc =
(PCollection<OutputT>) getOnlyElement(transform.getOutputs().values());
evaluationContext.scheduleAfterOutputWouldBeProduced(
outputPc,
GlobalWindow.INSTANCE,
outputPc.getWindowingStrategy(),
() -> {
try {
mark.finalizeCheckpoint();
} catch (IOException e) {
throw new RuntimeException(
"Couldn't finalize checkpoint after the end of the Global Window", e);
}
});
}
return mark;
}
@Override
public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle()
throws IOException {
return resultBuilder.build();
}
}
@AutoValue
abstract static class UnboundedSourceShard<T, CheckpointT extends CheckpointMark>
implements SourceShard<T> {
static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(
UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator) {
return of(source, deduplicator, null, null);
}
static <T, CheckpointT extends CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(
UnboundedSource<T, CheckpointT> source,
UnboundedReadDeduplicator deduplicator,
@Nullable UnboundedReader<T> reader,
@Nullable CheckpointT checkpoint) {
return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard<>(
source, deduplicator, reader, checkpoint);
}
@Override
public abstract UnboundedSource<T, CheckpointT> getSource();
abstract UnboundedReadDeduplicator getDeduplicator();
@Nullable
abstract UnboundedReader<T> getExistingReader();
@Nullable
abstract CheckpointT getCheckpoint();
}
static class InputProvider<T>
implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> {
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
InputProvider(EvaluationContext evaluationContext, PipelineOptions options) {
this.evaluationContext = evaluationContext;
this.options = options;
}
@Override
public Collection<CommittedBundle<UnboundedSourceShard<T, ?>>> getInitialInputs(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform,
int targetParallelism)
throws Exception {
UnboundedSource<T, ?> source = ReadTranslation.unboundedSourceFromTransform(transform);
List<? extends UnboundedSource<T, ?>> splits = source.split(targetParallelism, options);
UnboundedReadDeduplicator deduplicator =
source.requiresDeduping()
? UnboundedReadDeduplicator.CachedIdDeduplicator.create()
: NeverDeduplicator.create();
ImmutableList.Builder<CommittedBundle<UnboundedSourceShard<T, ?>>> initialShards =
ImmutableList.builder();
for (UnboundedSource<T, ?> split : splits) {
UnboundedSourceShard<T, ?> shard = UnboundedSourceShard.unstarted(split, deduplicator);
initialShards.add(
evaluationContext
.<UnboundedSourceShard<T, ?>>createRootBundle()
.add(WindowedValue.valueInGlobalWindow(shard))
.commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
}
return initialShards.build();
}
}
}