blob: d935b6c69a0216bd164faa7b4d47d373565ff6a9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.direct;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import javax.annotation.Nullable;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.StepTransformResult.Builder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.Read.Bounded;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.SettableFuture;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators}
* for the {@link Bounded Read.Bounded} primitive {@link PTransform}.
*/
final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory {
/**
* The required minimum size of a source to dynamically split. Produced {@link TransformEvaluator
* TransformEvaluators} will attempt to dynamically split all sources larger than the minimum
* dynamic split size.
*/
private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
// TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
@VisibleForTesting
final ExecutorService executor =
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setThreadFactory(MoreExecutors.platformThreadFactory())
.setDaemon(true)
.setNameFormat("direct-dynamic-split-requester")
.build());
private final long minimumDynamicSplitSize;
BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
this(evaluationContext, options, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE);
}
@VisibleForTesting
BoundedReadEvaluatorFactory(
EvaluationContext evaluationContext, PipelineOptions options, long minimumDynamicSplitSize) {
this.evaluationContext = evaluationContext;
this.options = options;
this.minimumDynamicSplitSize = minimumDynamicSplitSize;
}
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
@Nullable
public <InputT> TransformEvaluator<InputT> forApplication(
AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws IOException {
return createEvaluator((AppliedPTransform) application);
}
private <OutputT> TransformEvaluator<?> createEvaluator(
final AppliedPTransform<?, PCollection<OutputT>, ?> transform) {
return new BoundedReadEvaluator<>(
transform, evaluationContext, options, minimumDynamicSplitSize, executor);
}
@Override
public void cleanup() {
executor.shutdown();
}
/**
* A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource},
* discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator
* creates the {@link BoundedReader} and consumes all available input.
*
* <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and
* each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source
* may produce duplicate elements.
*/
private static class BoundedReadEvaluator<OutputT>
implements TransformEvaluator<BoundedSourceShard<OutputT>> {
private final PCollection<OutputT> outputPCollection;
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
private Builder resultBuilder;
private final long minimumDynamicSplitSize;
private final ExecutorService produceSplitExecutor;
public BoundedReadEvaluator(
AppliedPTransform<?, PCollection<OutputT>, ?> transform,
EvaluationContext evaluationContext,
PipelineOptions options,
long minimumDynamicSplitSize,
ExecutorService executor) {
this.evaluationContext = evaluationContext;
this.outputPCollection =
(PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values());
this.resultBuilder = StepTransformResult.withoutHold(transform);
this.options = options;
this.minimumDynamicSplitSize = minimumDynamicSplitSize;
this.produceSplitExecutor = executor;
}
@Override
public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element)
throws Exception {
BoundedSource<OutputT> source = element.getValue().getSource();
try (final BoundedReader<OutputT> reader = source.createReader(options)) {
boolean contentsRemaining = reader.start();
Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader);
UncommittedBundle<OutputT> output = evaluationContext.createBundle(outputPCollection);
while (contentsRemaining) {
output.add(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp()));
contentsRemaining = reader.advance();
}
resultBuilder.addOutput(output);
try {
BoundedSource<OutputT> residual = residualFuture.get();
if (residual != null) {
resultBuilder.addUnprocessedElements(
element.withValue(BoundedSourceShard.of(residual)));
}
} catch (ExecutionException exex) {
// Un-and-rewrap the exception thrown by attempting to split
throw UserCodeException.wrap(exex.getCause());
}
}
}
private Future<BoundedSource<OutputT>> startDynamicSplitThread(
BoundedSource<OutputT> source, BoundedReader<OutputT> reader) throws Exception {
if (source.getEstimatedSizeBytes(options) > minimumDynamicSplitSize) {
return produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint<>(reader));
} else {
SettableFuture<BoundedSource<OutputT>> emptyFuture = SettableFuture.create();
emptyFuture.set(null);
return emptyFuture;
}
}
@Override
public TransformResult<BoundedSourceShard<OutputT>> finishBundle() {
return resultBuilder.build();
}
}
@AutoValue
abstract static class BoundedSourceShard<T> implements SourceShard<T> {
static <T> BoundedSourceShard<T> of(BoundedSource<T> source) {
return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source);
}
@Override
public abstract BoundedSource<T> getSource();
}
static class InputProvider<T> implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> {
private final EvaluationContext evaluationContext;
private final PipelineOptions options;
InputProvider(EvaluationContext evaluationContext, PipelineOptions options) {
this.evaluationContext = evaluationContext;
this.options = options;
}
@Override
public Collection<CommittedBundle<BoundedSourceShard<T>>> getInitialInputs(
AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform,
int targetParallelism)
throws Exception {
BoundedSource<T> source = ReadTranslation.boundedSourceFromTransform(transform);
long estimatedBytes = source.getEstimatedSizeBytes(options);
long bytesPerBundle = estimatedBytes / targetParallelism;
List<? extends BoundedSource<T>> bundles = source.split(bytesPerBundle, options);
ImmutableList.Builder<CommittedBundle<BoundedSourceShard<T>>> shards =
ImmutableList.builder();
for (BoundedSource<T> bundle : bundles) {
CommittedBundle<BoundedSourceShard<T>> inputShard =
evaluationContext
.<BoundedSourceShard<T>>createRootBundle()
.add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(bundle)))
.commit(BoundedWindow.TIMESTAMP_MAX_VALUE);
shards.add(inputShard);
}
return shards.build();
}
}
private static class GenerateSplitAtHalfwayPoint<T> implements Callable<BoundedSource<T>> {
private final BoundedReader<T> reader;
private GenerateSplitAtHalfwayPoint(BoundedReader<T> reader) {
this.reader = reader;
}
@Override
public BoundedSource<T> call() throws Exception {
// Splits at halfway of the remaining work.
Double currentlyConsumed = reader.getFractionConsumed();
if (currentlyConsumed == null || currentlyConsumed == 1.0) {
return null;
}
double halfwayBetweenCurrentAndCompletion = 0.5 + (currentlyConsumed / 2);
return reader.splitAtFraction(halfwayBetweenCurrentAndCompletion);
}
}
}