| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.direct; |
| |
| import com.google.auto.value.AutoValue; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.core.construction.ReadTranslation; |
| import org.apache.beam.runners.direct.StepTransformResult.Builder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.BoundedSource.BoundedReader; |
| import org.apache.beam.sdk.io.Read.Bounded; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.runners.AppliedPTransform; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.util.UserCodeException; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.SettableFuture; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * A {@link TransformEvaluatorFactory} that produces {@link TransformEvaluator TransformEvaluators} |
| * for the {@link Bounded Read.Bounded} primitive {@link PTransform}. |
| */ |
| final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { |
| /** |
| * The required minimum size of a source to dynamically split. Produced {@link TransformEvaluator |
| * TransformEvaluators} will attempt to dynamically split all sources larger than the minimum |
| * dynamic split size. |
| */ |
| private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0; |
| |
| private final EvaluationContext evaluationContext; |
| private final PipelineOptions options; |
| |
| // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner. |
| @VisibleForTesting |
| final ExecutorService executor = |
| Executors.newCachedThreadPool( |
| new ThreadFactoryBuilder() |
| .setThreadFactory(MoreExecutors.platformThreadFactory()) |
| .setDaemon(true) |
| .setNameFormat("direct-dynamic-split-requester") |
| .build()); |
| |
| private final long minimumDynamicSplitSize; |
| |
| BoundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) { |
| this(evaluationContext, options, REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE); |
| } |
| |
| @VisibleForTesting |
| BoundedReadEvaluatorFactory( |
| EvaluationContext evaluationContext, PipelineOptions options, long minimumDynamicSplitSize) { |
| this.evaluationContext = evaluationContext; |
| this.options = options; |
| this.minimumDynamicSplitSize = minimumDynamicSplitSize; |
| } |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| @Override |
| @Nullable |
| public <InputT> TransformEvaluator<InputT> forApplication( |
| AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) throws IOException { |
| return createEvaluator((AppliedPTransform) application); |
| } |
| |
| private <OutputT> TransformEvaluator<?> createEvaluator( |
| final AppliedPTransform<?, PCollection<OutputT>, ?> transform) { |
| return new BoundedReadEvaluator<>( |
| transform, evaluationContext, options, minimumDynamicSplitSize, executor); |
| } |
| |
| @Override |
| public void cleanup() { |
| executor.shutdown(); |
| } |
| |
| /** |
| * A {@link BoundedReadEvaluator} produces elements from an underlying {@link BoundedSource}, |
| * discarding all input elements. Within the call to {@link #finishBundle()}, the evaluator |
| * creates the {@link BoundedReader} and consumes all available input. |
| * |
| * <p>A {@link BoundedReadEvaluator} should only be created once per {@link BoundedSource}, and |
| * each evaluator should only be called once per evaluation of the pipeline. Otherwise, the source |
| * may produce duplicate elements. |
| */ |
| private static class BoundedReadEvaluator<OutputT> |
| implements TransformEvaluator<BoundedSourceShard<OutputT>> { |
| private final PCollection<OutputT> outputPCollection; |
| private final EvaluationContext evaluationContext; |
| private final PipelineOptions options; |
| private Builder resultBuilder; |
| |
| private final long minimumDynamicSplitSize; |
| private final ExecutorService produceSplitExecutor; |
| |
| public BoundedReadEvaluator( |
| AppliedPTransform<?, PCollection<OutputT>, ?> transform, |
| EvaluationContext evaluationContext, |
| PipelineOptions options, |
| long minimumDynamicSplitSize, |
| ExecutorService executor) { |
| this.evaluationContext = evaluationContext; |
| this.outputPCollection = |
| (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values()); |
| this.resultBuilder = StepTransformResult.withoutHold(transform); |
| this.options = options; |
| this.minimumDynamicSplitSize = minimumDynamicSplitSize; |
| this.produceSplitExecutor = executor; |
| } |
| |
| @Override |
| public void processElement(WindowedValue<BoundedSourceShard<OutputT>> element) |
| throws Exception { |
| BoundedSource<OutputT> source = element.getValue().getSource(); |
| try (final BoundedReader<OutputT> reader = source.createReader(options)) { |
| boolean contentsRemaining = reader.start(); |
| Future<BoundedSource<OutputT>> residualFuture = startDynamicSplitThread(source, reader); |
| UncommittedBundle<OutputT> output = evaluationContext.createBundle(outputPCollection); |
| while (contentsRemaining) { |
| output.add( |
| WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp())); |
| contentsRemaining = reader.advance(); |
| } |
| resultBuilder.addOutput(output); |
| try { |
| BoundedSource<OutputT> residual = residualFuture.get(); |
| if (residual != null) { |
| resultBuilder.addUnprocessedElements( |
| element.withValue(BoundedSourceShard.of(residual))); |
| } |
| } catch (ExecutionException exex) { |
| // Un-and-rewrap the exception thrown by attempting to split |
| throw UserCodeException.wrap(exex.getCause()); |
| } |
| } |
| } |
| |
| private Future<BoundedSource<OutputT>> startDynamicSplitThread( |
| BoundedSource<OutputT> source, BoundedReader<OutputT> reader) throws Exception { |
| if (source.getEstimatedSizeBytes(options) > minimumDynamicSplitSize) { |
| return produceSplitExecutor.submit(new GenerateSplitAtHalfwayPoint<>(reader)); |
| } else { |
| SettableFuture<BoundedSource<OutputT>> emptyFuture = SettableFuture.create(); |
| emptyFuture.set(null); |
| return emptyFuture; |
| } |
| } |
| |
| @Override |
| public TransformResult<BoundedSourceShard<OutputT>> finishBundle() { |
| return resultBuilder.build(); |
| } |
| } |
| |
| @AutoValue |
| abstract static class BoundedSourceShard<T> implements SourceShard<T> { |
| static <T> BoundedSourceShard<T> of(BoundedSource<T> source) { |
| return new AutoValue_BoundedReadEvaluatorFactory_BoundedSourceShard<>(source); |
| } |
| |
| @Override |
| public abstract BoundedSource<T> getSource(); |
| } |
| |
| static class InputProvider<T> implements RootInputProvider<T, BoundedSourceShard<T>, PBegin> { |
| private final EvaluationContext evaluationContext; |
| private final PipelineOptions options; |
| |
| InputProvider(EvaluationContext evaluationContext, PipelineOptions options) { |
| this.evaluationContext = evaluationContext; |
| this.options = options; |
| } |
| |
| @Override |
| public Collection<CommittedBundle<BoundedSourceShard<T>>> getInitialInputs( |
| AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform, |
| int targetParallelism) |
| throws Exception { |
| BoundedSource<T> source = ReadTranslation.boundedSourceFromTransform(transform); |
| long estimatedBytes = source.getEstimatedSizeBytes(options); |
| long bytesPerBundle = estimatedBytes / targetParallelism; |
| List<? extends BoundedSource<T>> bundles = source.split(bytesPerBundle, options); |
| ImmutableList.Builder<CommittedBundle<BoundedSourceShard<T>>> shards = |
| ImmutableList.builder(); |
| for (BoundedSource<T> bundle : bundles) { |
| CommittedBundle<BoundedSourceShard<T>> inputShard = |
| evaluationContext |
| .<BoundedSourceShard<T>>createRootBundle() |
| .add(WindowedValue.valueInGlobalWindow(BoundedSourceShard.of(bundle))) |
| .commit(BoundedWindow.TIMESTAMP_MAX_VALUE); |
| shards.add(inputShard); |
| } |
| return shards.build(); |
| } |
| } |
| |
| private static class GenerateSplitAtHalfwayPoint<T> implements Callable<BoundedSource<T>> { |
| private final BoundedReader<T> reader; |
| |
| private GenerateSplitAtHalfwayPoint(BoundedReader<T> reader) { |
| this.reader = reader; |
| } |
| |
| @Override |
| public BoundedSource<T> call() throws Exception { |
| // Splits at halfway of the remaining work. |
| Double currentlyConsumed = reader.getFractionConsumed(); |
| if (currentlyConsumed == null || currentlyConsumed == 1.0) { |
| return null; |
| } |
| double halfwayBetweenCurrentAndCompletion = 0.5 + (currentlyConsumed / 2); |
| return reader.splitAtFraction(halfwayBetweenCurrentAndCompletion); |
| } |
| } |
| } |