| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.runners.dataflow.worker; |
| |
| import static com.google.api.client.util.Base64.decodeBase64; |
| import static com.google.api.client.util.Base64.encodeBase64String; |
| import static org.apache.beam.runners.dataflow.util.Structs.addString; |
| import static org.apache.beam.runners.dataflow.util.Structs.getString; |
| import static org.apache.beam.runners.dataflow.util.Structs.getStrings; |
| import static org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray; |
| import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.api.client.util.Base64; |
| import com.google.api.services.dataflow.model.ApproximateReportedProgress; |
| import com.google.api.services.dataflow.model.ApproximateSplitRequest; |
| import com.google.api.services.dataflow.model.DerivedSource; |
| import com.google.api.services.dataflow.model.DynamicSourceSplit; |
| import com.google.api.services.dataflow.model.ReportedParallelism; |
| import com.google.api.services.dataflow.model.SourceMetadata; |
| import com.google.api.services.dataflow.model.SourceOperationResponse; |
| import com.google.api.services.dataflow.model.SourceSplitOptions; |
| import com.google.api.services.dataflow.model.SourceSplitRequest; |
| import com.google.api.services.dataflow.model.SourceSplitResponse; |
| import com.google.auto.service.AutoService; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.TimeUnit; |
| import javax.annotation.Nullable; |
| import org.apache.beam.runners.dataflow.internal.CustomSources; |
| import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; |
| import org.apache.beam.runners.dataflow.util.CloudObject; |
| import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.Source; |
| import org.apache.beam.sdk.io.UnboundedSource; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.util.BackOff; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.sdk.util.WindowedValue; |
| import org.apache.beam.sdk.values.ValueWithRecordId; |
| import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.Uninterruptibles; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A helper class for supporting sources defined as {@code Source}. |
| * |
| * <p>Provides a bridge between the high-level {@code Source} API and the low-level {@code |
| * CloudSource} class. |
| */ |
| public class WorkerCustomSources { |
| private static final String SERIALIZED_SOURCE = "serialized_source"; |
| @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; |
| private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20); |
| |
| // The maximum number of bundles we are willing to return to the service in one response. |
| static final int DEFAULT_NUM_BUNDLES_LIMIT = 100; |
| |
| /** |
| * The current limit on the size of a ReportWorkItemStatus RPC to Google Cloud Dataflow, which |
| * includes the initial splits, is 20 MB. |
| */ |
| public static final long DATAFLOW_SPLIT_RESPONSE_API_SIZE_LIMIT = 20 * (1 << 20); |
| |
| private static final Logger LOG = LoggerFactory.getLogger(WorkerCustomSources.class); |
| |
| /** |
| * A {@code DynamicSplitResult} specified explicitly by a pair of {@code BoundedSource} objects |
| * describing the primary and residual sources. |
| */ |
| public static final class BoundedSourceSplit<T> implements NativeReader.DynamicSplitResult { |
| public final BoundedSource<T> primary; |
| public final BoundedSource<T> residual; |
| |
| public BoundedSourceSplit(BoundedSource<T> primary, BoundedSource<T> residual) { |
| this.primary = primary; |
| this.residual = residual; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("<primary: %s; residual: %s>", primary, residual); |
| } |
| } |
| |
| public static DynamicSourceSplit toSourceSplit(BoundedSourceSplit<?> sourceSplitResult) { |
| DynamicSourceSplit sourceSplit = new DynamicSourceSplit(); |
| com.google.api.services.dataflow.model.Source primarySource; |
| com.google.api.services.dataflow.model.Source residualSource; |
| try { |
| primarySource = serializeSplitToCloudSource(sourceSplitResult.primary); |
| residualSource = serializeSplitToCloudSource(sourceSplitResult.residual); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to serialize one of the parts of the source split", e); |
| } |
| sourceSplit.setPrimary( |
| new DerivedSource() |
| .setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT") |
| .setSource(primarySource)); |
| sourceSplit.setResidual( |
| new DerivedSource() |
| .setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT") |
| .setSource(residualSource)); |
| return sourceSplit; |
| } |
| |
| /** |
| * Version of {@link CustomSources#serializeToCloudSource(Source, PipelineOptions)} intended for |
| * use on splits of {@link BoundedSource}. |
| */ |
| private static com.google.api.services.dataflow.model.Source serializeSplitToCloudSource( |
| BoundedSource<?> source) throws Exception { |
| com.google.api.services.dataflow.model.Source cloudSource = |
| new com.google.api.services.dataflow.model.Source(); |
| cloudSource.setSpec(CloudObject.forClass(CustomSources.class)); |
| addString( |
| cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source))); |
| SourceMetadata metadata = new SourceMetadata(); |
| // Size estimation is best effort so we continue even if it fails here. |
| try { |
| metadata.setEstimatedSizeBytes(source.getEstimatedSizeBytes(PipelineOptionsFactory.create())); |
| } catch (Exception e) { |
| LOG.warn("Size estimation of the source failed: " + source, e); |
| } |
| cloudSource.setMetadata(metadata); |
| return cloudSource; |
| } |
| |
| /** |
| * Executes a protocol-level split {@code SourceOperationRequest} for bounded sources by |
| * deserializing its source to a {@code BoundedSource}, splitting it, and serializing results |
| * back. |
| * |
| * <p>When the splits produced by this function are too large to be serialized to the Dataflow |
| * API, splitting is retried once with an increase in the desired bundle size. This change aims to |
| * work around API limitations on split size. |
| */ |
| public static SourceOperationResponse performSplit( |
| SourceSplitRequest request, PipelineOptions options) throws Exception { |
| return performSplitWithApiLimit( |
| request, options, DEFAULT_NUM_BUNDLES_LIMIT, DATAFLOW_SPLIT_RESPONSE_API_SIZE_LIMIT); |
| } |
| |
| /** |
| * A helper method like {@link #performSplit(SourceSplitRequest, PipelineOptions)} but that allows |
| * overriding the API size limit for testing. |
| */ |
| static SourceOperationResponse performSplitWithApiLimit( |
| SourceSplitRequest request, PipelineOptions options, int numBundlesLimit, long apiByteLimit) |
| throws Exception { |
| // Compute the desired bundle size given by the service, or default if none was provided. |
| long desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES; |
| SourceSplitOptions splitOptions = request.getOptions(); |
| if (splitOptions != null && splitOptions.getDesiredBundleSizeBytes() != null) { |
| desiredBundleSizeBytes = splitOptions.getDesiredBundleSizeBytes(); |
| } |
| |
| Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec()); |
| checkArgument( |
| anySource instanceof BoundedSource, "Cannot split a non-Bounded source: %s", anySource); |
| return performSplitTyped( |
| options, |
| (BoundedSource<?>) anySource, |
| desiredBundleSizeBytes, |
| numBundlesLimit, |
| apiByteLimit); |
| } |
| |
| private static <T> SourceOperationResponse performSplitTyped( |
| PipelineOptions options, |
| BoundedSource<T> source, |
| long desiredBundleSizeBytes, |
| int numBundlesLimit, |
| long apiByteLimit) |
| throws Exception { |
| // Try to split normally |
| List<BoundedSource<T>> bundles = splitAndValidate(source, desiredBundleSizeBytes, options); |
| |
| // If serialized size is too big, try splitting with a proportionally larger desiredBundleSize |
| // to reduce the oversplitting. |
| long serializedSize = |
| DataflowApiUtils.computeSerializedSizeBytes(wrapIntoSourceSplitResponse(bundles)); |
| |
| // If split response is too large, scale desired size for expected DATAFLOW_API_SIZE_BYTES/2. |
| if (serializedSize > apiByteLimit) { |
| double expansion = 2 * (double) serializedSize / apiByteLimit; |
| long expandedBundleSizeBytes = (long) (desiredBundleSizeBytes * expansion); |
| LOG.warn( |
| "Splitting source {} into bundles of estimated size {} bytes produced {} bundles, which" |
| + " have total serialized size {} bytes. As this is too large for the Google Cloud" |
| + " Dataflow API, retrying splitting once with increased desiredBundleSizeBytes {}" |
| + " to reduce the number of splits.", |
| source, |
| desiredBundleSizeBytes, |
| bundles.size(), |
| serializedSize, |
| expandedBundleSizeBytes); |
| desiredBundleSizeBytes = expandedBundleSizeBytes; |
| bundles = splitAndValidate(source, desiredBundleSizeBytes, options); |
| serializedSize = |
| DataflowApiUtils.computeSerializedSizeBytes(wrapIntoSourceSplitResponse(bundles)); |
| LOG.info( |
| "Splitting with desiredBundleSizeBytes {} produced {} bundles " |
| + "with total serialized size {} bytes", |
| desiredBundleSizeBytes, |
| bundles.size(), |
| serializedSize); |
| } |
| |
| int numBundlesBeforeRebundling = bundles.size(); |
| // To further reduce size of the response and service-side memory usage, coalesce |
| // the sources into numBundlesLimit compressed serialized bundles. |
| if (bundles.size() > numBundlesLimit) { |
| LOG.warn( |
| "Splitting source {} into bundles of estimated size {} bytes produced {} bundles. " |
| + "Rebundling into {} bundles.", |
| source, |
| desiredBundleSizeBytes, |
| bundles.size(), |
| numBundlesLimit); |
| bundles = limitNumberOfBundles(bundles, numBundlesLimit); |
| } |
| |
| SourceOperationResponse response = |
| new SourceOperationResponse().setSplit(wrapIntoSourceSplitResponse(bundles)); |
| long finalResponseSize = DataflowApiUtils.computeSerializedSizeBytes(response); |
| LOG.info( |
| "Splitting source {} produced {} bundles with total serialized response size {}", |
| source, |
| bundles.size(), |
| finalResponseSize); |
| if (finalResponseSize > apiByteLimit) { |
| String message = |
| String.format( |
| "Total size of the BoundedSource objects generated by split() operation is larger " |
| + "than the allowable limit. When splitting %s into bundles of %d bytes " |
| + "it generated %d BoundedSource objects with total serialized size of %d bytes " |
| + "which is larger than the limit %d. " |
| + "For more information, please check the corresponding FAQ entry at " |
| + "https://cloud.google.com/dataflow/pipelines/troubleshooting-your-pipeline", |
| source, |
| desiredBundleSizeBytes, |
| numBundlesBeforeRebundling, |
| finalResponseSize, |
| apiByteLimit); |
| throw new IllegalArgumentException(message); |
| } |
| return response; |
| } |
| |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| private static <T> List<BoundedSource<T>> splitAndValidate( |
| BoundedSource<T> source, long desiredBundleSizeBytes, PipelineOptions options) |
| throws Exception { |
| List<BoundedSource<T>> bundles = (List) source.split(desiredBundleSizeBytes, options); |
| for (BoundedSource<T> split : bundles) { |
| try { |
| split.validate(); |
| } catch (Exception e) { |
| throw new IllegalArgumentException( |
| String.format( |
| "Splitting a valid source produced an invalid source." |
| + "%nOriginal source: %s%nInvalid source: %s", |
| source, split), |
| e); |
| } |
| } |
| |
| return bundles; |
| } |
| |
| private static SourceSplitResponse wrapIntoSourceSplitResponse( |
| List<? extends BoundedSource<?>> bundles) throws Exception { |
| List<DerivedSource> splits = new ArrayList<>(bundles.size()); |
| for (BoundedSource<?> split : bundles) { |
| splits.add( |
| new DerivedSource() |
| .setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT") |
| .setSource( |
| serializeSplitToCloudSource(split) |
| .setDoesNotNeedSplitting( |
| // We purposely set this to false when using SplittableOnlyBoundedSource |
| // to tell the service that we need further splits. |
| !(split instanceof SplittableOnlyBoundedSource)))); |
| } |
| |
| // Return all the splits in the SourceSplitResponse. |
| return new SourceSplitResponse() |
| .setBundles(splits) |
| .setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED"); |
| } |
| |
| /** A {@link ReaderFactory.Registrar} for user defined custom sources. */ |
| @AutoService(ReaderFactory.Registrar.class) |
| public static class Registrar implements ReaderFactory.Registrar { |
| |
| @Override |
| public Map<String, ReaderFactory> factories() { |
| Factory factory = new Factory(); |
| return ImmutableMap.of( |
| "org.apache.beam.runners.dataflow.internal.CustomSources", factory, |
| "org.apache.beam.runners.dataflow.worker.runners.dataflow.WorkerCustomSources", factory); |
| } |
| } |
| |
| /** Factory to create a {@link WorkerCustomSources} from a Dataflow API source specification. */ |
| public static class Factory implements ReaderFactory { |
| @Override |
| public NativeReader<?> create( |
| CloudObject spec, |
| @Nullable Coder<?> coder, |
| @Nullable PipelineOptions options, |
| @Nullable DataflowExecutionContext executionContext, |
| DataflowOperationContext operationContext) |
| throws Exception { |
| // The parameter "coder" is deliberately never used. It is an artifact of ReaderFactory: |
| // some readers need a coder, some don't (i.e. for some it doesn't even make sense), |
| // but ReaderFactory passes it to all readers anyway. |
| return WorkerCustomSources.create(spec, options, executionContext); |
| } |
| } |
| |
| public static NativeReader<WindowedValue<?>> create( |
| final CloudObject spec, |
| final PipelineOptions options, |
| DataflowExecutionContext executionContext) |
| throws Exception { |
| |
| @SuppressWarnings("unchecked") |
| final Source<Object> source = (Source<Object>) deserializeFromCloudSource(spec); |
| |
| if (source instanceof BoundedSource) { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| NativeReader<WindowedValue<?>> reader = |
| (NativeReader) |
| new NativeReader<WindowedValue<Object>>() { |
| @Override |
| public NativeReaderIterator<WindowedValue<Object>> iterator() throws IOException { |
| return new BoundedReaderIterator<>( |
| ((BoundedSource<Object>) source).createReader(options)); |
| } |
| }; |
| return reader; |
| } else if (source instanceof UnboundedSource) { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| NativeReader<WindowedValue<?>> reader = |
| (NativeReader) |
| new UnboundedReader<Object>( |
| options, spec, (StreamingModeExecutionContext) executionContext); |
| return reader; |
| } else { |
| throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); |
| } |
| } |
| |
| private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001"); |
| |
| public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) { |
| return splitKey.equals(firstSplitKey); |
| } |
| |
| /** {@link NativeReader} for reading from {@link UnboundedSource UnboundedSources}. */ |
| private static class UnboundedReader<T> |
| extends NativeReader<WindowedValue<ValueWithRecordId<T>>> { |
| private final PipelineOptions options; |
| private final CloudObject spec; |
| private final StreamingModeExecutionContext context; |
| |
| UnboundedReader( |
| PipelineOptions options, CloudObject spec, StreamingModeExecutionContext context) { |
| this.options = options; |
| this.spec = spec; |
| this.context = context; |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() throws IOException { |
| UnboundedSource.UnboundedReader<T> reader = |
| (UnboundedSource.UnboundedReader<T>) context.getCachedReader(); |
| final boolean started = reader != null; |
| if (reader == null) { |
| String key = context.getSerializedKey().toStringUtf8(); |
| // Key is expected to be a zero-padded integer representing the split index. |
| int splitIndex = Integer.parseInt(key.substring(0, 16), 16) - 1; |
| |
| UnboundedSource<T, UnboundedSource.CheckpointMark> splitSource = parseSource(splitIndex); |
| |
| UnboundedSource.CheckpointMark checkpoint = null; |
| if (splitSource.getCheckpointMarkCoder() != null) { |
| checkpoint = context.getReaderCheckpoint(splitSource.getCheckpointMarkCoder()); |
| } |
| |
| reader = splitSource.createReader(options, checkpoint); |
| } |
| |
| context.setActiveReader(reader); |
| |
| return new UnboundedReaderIterator<>(reader, context, started); |
| } |
| |
| @Override |
| public boolean supportsRestart() { |
| return true; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index) { |
| List<String> serializedSplits = null; |
| try { |
| serializedSplits = getStrings(spec, SERIALIZED_SOURCE_SPLITS, null); |
| } catch (Exception e) { |
| throw new RuntimeException("Parsing serialized source splits failed: ", e); |
| } |
| checkArgument(serializedSplits != null, "UnboundedSource object did not contain splits"); |
| checkArgument( |
| index < serializedSplits.size(), |
| "UnboundedSource splits contained too few splits. Requested index was %s, size was %s", |
| index, |
| serializedSplits.size()); |
| Object rawSource = |
| deserializeFromByteArray( |
| decodeBase64(serializedSplits.get(index)), "UnboundedSource split"); |
| if (!(rawSource instanceof UnboundedSource)) { |
| throw new IllegalArgumentException("Expected UnboundedSource, got " + rawSource.getClass()); |
| } |
| return (UnboundedSource<T, UnboundedSource.CheckpointMark>) rawSource; |
| } |
| } |
| |
| /** |
| * This is a bounded source that doesn't know how to read data. It is used to encompass several |
| * splits to workaround Dataflow API limits. It is able to achieve this goal by being compressed |
| * which leverages the fact that this object stores several splits so compression works across |
| * these splits. Empirically this lets us scale out the amount of initial splits the user returned |
| * by a few orders of magnitude. |
| * |
| * <p>TODO: Replace with a concat custom source once one is available or deprecate in favor of <a |
| * href="https://issues.apache.org/jira/browse/BEAM-65">splittable DoFns</a>. |
| */ |
| @VisibleForTesting |
| static final class SplittableOnlyBoundedSource<T> extends BoundedSource<T> { |
| private final List<? extends BoundedSource<T>> boundedSources; |
| |
| private SplittableOnlyBoundedSource(List<? extends BoundedSource<T>> boundedSources) { |
| this.boundedSources = ImmutableList.copyOf(boundedSources); |
| } |
| |
| @Override |
| public List<? extends BoundedSource<T>> split( |
| long desiredBundleSizeBytes, PipelineOptions options) throws Exception { |
| return boundedSources; |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(final PipelineOptions options) throws Exception { |
| List<Callable<Long>> callables = new ArrayList<>(boundedSources.size()); |
| for (final BoundedSource<T> source : boundedSources) { |
| callables.add(() -> source.getEstimatedSizeBytes(options)); |
| } |
| |
| long sum = 0L; |
| for (Future<Long> result : |
| options.as(DataflowPipelineOptions.class).getExecutorService().invokeAll(callables)) { |
| sum += result.get(); |
| } |
| return sum; |
| } |
| |
| @Override |
| public BoundedSource.BoundedReader<T> createReader(PipelineOptions options) throws IOException { |
| throw new UnsupportedOperationException( |
| "SplittableOnlyBoundedSource only supports splitting."); |
| } |
| |
| @Override |
| public void validate() { |
| for (BoundedSource<T> boundedSource : boundedSources) { |
| boundedSource.validate(); |
| } |
| } |
| |
| @Override |
| public Coder<T> getDefaultOutputCoder() { |
| return boundedSources.get(0).getDefaultOutputCoder(); |
| } |
| } |
| |
| private static <T> List<BoundedSource<T>> limitNumberOfBundles( |
| List<BoundedSource<T>> bundles, int maxBundles) { |
| List<BoundedSource<T>> splittableBoundedSources = new ArrayList<>(); |
| // Greedily create SplittableOnlyBoundedSources with powers of "maxBundles" to |
| // minimize the number of splits required to be done. |
| |
| // Find the largest power of "maxBundles" that is strictly less than the number |
| // of elements. We use a long here to not have to worry about overflowing 2^32 during |
| // comparison. |
| int numElementsToPutIntoBundle = maxBundles; |
| while (bundles.size() > (long) numElementsToPutIntoBundle * maxBundles) { |
| numElementsToPutIntoBundle *= maxBundles; |
| } |
| |
| // Insert as many full groups of bundles of the largest size that we can |
| int startIndex = 0; |
| // Compute the remaining capacity if we were to fit all the rest of the bundles with |
| // smaller splits. |
| int remainingCapacity = |
| numElementsToPutIntoBundle |
| / maxBundles |
| * (maxBundles - splittableBoundedSources.size() - 1); |
| for (; |
| startIndex < bundles.size() - numElementsToPutIntoBundle - remainingCapacity; |
| startIndex += numElementsToPutIntoBundle) { |
| splittableBoundedSources.add( |
| new SplittableOnlyBoundedSource<>( |
| bundles.subList(startIndex, startIndex + numElementsToPutIntoBundle))); |
| remainingCapacity = |
| numElementsToPutIntoBundle |
| / maxBundles |
| * (maxBundles - splittableBoundedSources.size() - 1); |
| } |
| |
| // We compute how many elements we should place into the next bundle based upon how many |
| // we can fit of the smaller size in the remaining spots. |
| splittableBoundedSources.add( |
| new SplittableOnlyBoundedSource<>( |
| bundles.subList(startIndex, bundles.size() - remainingCapacity))); |
| startIndex = bundles.size() - remainingCapacity; |
| |
| // Use the smaller bundle size to fill in the remaining bundles. |
| numElementsToPutIntoBundle /= maxBundles; |
| for (; startIndex < bundles.size(); startIndex += numElementsToPutIntoBundle) { |
| if (numElementsToPutIntoBundle == 1) { |
| splittableBoundedSources.add(bundles.get(startIndex)); |
| } else { |
| splittableBoundedSources.add( |
| new SplittableOnlyBoundedSource<>( |
| bundles.subList(startIndex, startIndex + numElementsToPutIntoBundle))); |
| } |
| } |
| |
| return splittableBoundedSources; |
| } |
| |
| @VisibleForTesting |
| static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception { |
| Source<?> source = |
| (Source<?>) |
| deserializeFromByteArray( |
| Base64.decodeBase64(getString(spec, SERIALIZED_SOURCE)), "Source"); |
| try { |
| source.validate(); |
| } catch (Exception e) { |
| LOG.error("Invalid source: {}", source, e); |
| throw e; |
| } |
| return source; |
| } |
| |
| @VisibleForTesting |
| static class BoundedReaderIterator<T> |
| extends NativeReader.NativeReaderIterator<WindowedValue<T>> { |
| private BoundedSource.BoundedReader<T> reader; |
| |
| private BoundedReaderIterator(BoundedSource.BoundedReader<T> reader) { |
| this.reader = reader; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| try { |
| return reader.start(); |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to start reading from source: " + reader.getCurrentSource(), e); |
| } |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| try { |
| return reader.advance(); |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to advance reader of source: " + reader.getCurrentSource(), e); |
| } |
| } |
| |
| @Override |
| public WindowedValue<T> getCurrent() throws NoSuchElementException { |
| return WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp()); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| reader.close(); |
| } |
| |
| @Nullable |
| @VisibleForTesting |
| static ReportedParallelism longToParallelism(long value) { |
| if (value >= 0) { |
| return new ReportedParallelism().setValue(Double.valueOf(value)); |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Testable helper for {@link #getProgress}. Sets fraction consumed, parallelism consumed, and |
| * parallelism remaining while also being fault tolerant. |
| */ |
| @VisibleForTesting |
| static ApproximateReportedProgress getReaderProgress(BoundedSource.BoundedReader<?> reader) { |
| ApproximateReportedProgress progress = new ApproximateReportedProgress(); |
| // Fraction consumed |
| try { |
| Double fractionConsumed = reader.getFractionConsumed(); |
| if (fractionConsumed != null) { |
| progress.setFractionConsumed(fractionConsumed); |
| } |
| } catch (Throwable t) { |
| LOG.warn("Error estimating fraction consumed from reader {}", reader, t); |
| } |
| // Parallelism consumed |
| try { |
| ReportedParallelism parallelism = longToParallelism(reader.getSplitPointsConsumed()); |
| if (parallelism != null) { |
| progress.setConsumedParallelism(parallelism); |
| } |
| } catch (Throwable t) { |
| LOG.warn("Error estimating consumed parallelism from reader {}", reader, t); |
| } |
| // Parallelism remaining |
| try { |
| ReportedParallelism parallelism = longToParallelism(reader.getSplitPointsRemaining()); |
| if (parallelism != null) { |
| progress.setRemainingParallelism(parallelism); |
| } |
| } catch (Throwable t) { |
| LOG.warn("Error estimating remaining parallelism from reader {}", reader, t); |
| } |
| return progress; |
| } |
| |
| @Override |
| public NativeReader.Progress getProgress() { |
| // Delegate to testable helper. |
| return SourceTranslationUtils.cloudProgressToReaderProgress(getReaderProgress(reader)); |
| } |
| |
| @Override |
| public NativeReader.DynamicSplitResult requestDynamicSplit( |
| NativeReader.DynamicSplitRequest request) { |
| ApproximateSplitRequest stopPosition = |
| SourceTranslationUtils.splitRequestToApproximateSplitRequest(request); |
| Double fractionConsumed = stopPosition.getFractionConsumed(); |
| if (fractionConsumed == null) { |
| // Only truncating at a fraction is currently supported. |
| LOG.info( |
| "Rejecting split request because custom sources only support splits at fraction: {}", |
| stopPosition); |
| return null; |
| } |
| BoundedSource<T> original = reader.getCurrentSource(); |
| BoundedSource<T> residual = reader.splitAtFraction(fractionConsumed); |
| if (residual == null) { |
| LOG.info("Rejecting split request because custom reader returned null residual source."); |
| return null; |
| } |
| // Try to catch some potential subclass implementation errors early. |
| BoundedSource<T> primary = reader.getCurrentSource(); |
| if (original == primary) { |
| throw new IllegalStateException( |
| "Successful split did not change the current source: primary is identical to original" |
| + " (Source objects MUST be immutable): " |
| + primary); |
| } |
| if (original == residual) { |
| throw new IllegalStateException( |
| "Successful split did not change the current source: residual is identical to original" |
| + " (Source objects MUST be immutable): " |
| + residual); |
| } |
| try { |
| primary.validate(); |
| } catch (Exception e) { |
| throw new IllegalStateException( |
| "Successful split produced an illegal primary source. " |
| + "\nOriginal: " |
| + original |
| + "\nPrimary: " |
| + primary |
| + "\nResidual: " |
| + residual); |
| } |
| try { |
| residual.validate(); |
| } catch (Exception e) { |
| throw new IllegalStateException( |
| "Successful split produced an illegal residual source. " |
| + "\nOriginal: " |
| + original |
| + "\nPrimary: " |
| + primary |
| + "\nResidual: " |
| + residual); |
| } |
| return new BoundedSourceSplit<T>(primary, residual); |
| } |
| |
| @Override |
| public double getRemainingParallelism() { |
| return Double.NaN; |
| } |
| } |
| |
| // Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing |
| // smoothly, and ensures that not too much work will have to be reprocessed in the event of |
| // a crash. |
| @VisibleForTesting static int maxUnboundedBundleSize = 10000; |
| |
| @VisibleForTesting |
| static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10); |
| // Backoff starting at 100ms, for approximately 1s total. 100+150+225+337.5~=1000. |
| private static final FluentBackoff BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT.withMaxRetries(4).withInitialBackoff(Duration.millis(100)); |
| |
| private static class UnboundedReaderIterator<T> |
| extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> { |
| private final UnboundedSource.UnboundedReader<T> reader; |
| private final StreamingModeExecutionContext context; |
| private final boolean started; |
| private final Instant endTime; |
| private int elemsRead; |
| |
| private UnboundedReaderIterator( |
| UnboundedSource.UnboundedReader<T> reader, |
| StreamingModeExecutionContext context, |
| boolean started) { |
| this.reader = reader; |
| this.context = context; |
| this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME); |
| this.elemsRead = 0; |
| this.started = started; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| if (started) { |
| // This is a reader that has been restored from the unbounded reader cache. |
| // It has already been started, so this call to start() should delegate |
| // to advance() instead. |
| return advance(); |
| } |
| try { |
| if (!reader.start()) { |
| return false; |
| } |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to start reading from source: " + reader.getCurrentSource(), e); |
| } |
| elemsRead++; |
| return true; |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| if (elemsRead >= maxUnboundedBundleSize |
| || Instant.now().isAfter(endTime) |
| || context.isSinkFullHintSet()) { |
| return false; |
| } |
| |
| BackOff backoff = BACKOFF_FACTORY.backoff(); |
| while (true) { |
| try { |
| if (reader.advance()) { |
| elemsRead++; |
| return true; |
| } |
| } catch (Exception e) { |
| throw new IOException("Failed to advance source: " + reader.getCurrentSource(), e); |
| } |
| long nextBackoff = backoff.nextBackOffMillis(); |
| if (nextBackoff == BackOff.STOP) { |
| return false; |
| } |
| Uninterruptibles.sleepUninterruptibly(nextBackoff, TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| @Override |
| public WindowedValue<ValueWithRecordId<T>> getCurrent() throws NoSuchElementException { |
| WindowedValue<T> result = |
| WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp()); |
| return result.withValue( |
| new ValueWithRecordId<>(result.getValue(), reader.getCurrentRecordId())); |
| } |
| |
| @Override |
| public void close() {} |
| |
| @Override |
| public NativeReader.Progress getProgress() { |
| return null; |
| } |
| |
| @Override |
| public NativeReader.DynamicSplitResult requestDynamicSplit( |
| NativeReader.DynamicSplitRequest request) { |
| return null; |
| } |
| |
| @Override |
| public double getRemainingParallelism() { |
| return Double.NaN; |
| } |
| } |
| } |