| /* |
| * Copyright (C) 2015 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package com.google.cloud.dataflow.sdk.runners.dataflow; |
| |
| import static com.google.api.client.util.Base64.decodeBase64; |
| import static com.google.api.client.util.Base64.encodeBase64String; |
| import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceToDictionary; |
| import static com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray; |
| import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray; |
| import static com.google.cloud.dataflow.sdk.util.Structs.addString; |
| import static com.google.cloud.dataflow.sdk.util.Structs.addStringList; |
| import static com.google.cloud.dataflow.sdk.util.Structs.getString; |
| import static com.google.cloud.dataflow.sdk.util.Structs.getStrings; |
| |
| import com.google.api.client.util.BackOff; |
| import com.google.api.client.util.Base64; |
| import com.google.api.services.dataflow.model.ApproximateReportedProgress; |
| import com.google.api.services.dataflow.model.ApproximateSplitRequest; |
| import com.google.api.services.dataflow.model.DerivedSource; |
| import com.google.api.services.dataflow.model.DynamicSourceSplit; |
| import com.google.api.services.dataflow.model.SourceMetadata; |
| import com.google.api.services.dataflow.model.SourceOperationRequest; |
| import com.google.api.services.dataflow.model.SourceOperationResponse; |
| import com.google.api.services.dataflow.model.SourceSplitOptions; |
| import com.google.api.services.dataflow.model.SourceSplitRequest; |
| import com.google.api.services.dataflow.model.SourceSplitResponse; |
| import com.google.cloud.dataflow.sdk.coders.Coder; |
| import com.google.cloud.dataflow.sdk.io.BoundedSource; |
| import com.google.cloud.dataflow.sdk.io.Read; |
| import com.google.cloud.dataflow.sdk.io.Source; |
| import com.google.cloud.dataflow.sdk.io.UnboundedSource; |
| import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; |
| import com.google.cloud.dataflow.sdk.options.PipelineOptions; |
| import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator; |
| import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner; |
| import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory; |
| import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils; |
| import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext; |
| import com.google.cloud.dataflow.sdk.transforms.PTransform; |
| import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff; |
| import com.google.cloud.dataflow.sdk.util.CloudObject; |
| import com.google.cloud.dataflow.sdk.util.ExecutionContext; |
| import com.google.cloud.dataflow.sdk.util.PropertyNames; |
| import com.google.cloud.dataflow.sdk.util.ValueWithRecordId; |
| import com.google.cloud.dataflow.sdk.util.WindowedValue; |
| import com.google.cloud.dataflow.sdk.util.common.CounterSet; |
| import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader; |
| import com.google.cloud.dataflow.sdk.values.PValue; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.protobuf.ByteString; |
| |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| |
| import javax.annotation.Nullable; |
| |
| /** |
| * A helper class for supporting sources defined as {@code Source}. |
| * |
| * <p>Provides a bridge between the high-level {@code Source} API and the |
| * low-level {@code CloudSource} class. |
| */ |
| public class CustomSources { |
| private static final String SERIALIZED_SOURCE = "serialized_source"; |
| @VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits"; |
| private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20); |
| |
| public static final String TOO_MANY_SOURCE_SPLITS_ERROR = |
| "Total number of Source objects generated by splitIntoBundles() operation, %d, is" |
| + " larger than the allowable limit, %d. For more information, please check the corresponding" |
| + " FAQ entry at:\n" |
| + "https://cloud.google.com/dataflow/faq"; |
| |
| // Maximum number of custom source splits currently supported by Dataflow. |
| private static final int MAX_NUMBER_OF_SPLITS = 16000; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class); |
| |
| /** |
| * A {@code DynamicSplitResult} specified explicitly by a pair of {@code BoundedSource} |
| * objects describing the primary and residual sources. |
| */ |
| public static final class BoundedSourceSplit<T> implements NativeReader.DynamicSplitResult { |
| public final BoundedSource<T> primary; |
| public final BoundedSource<T> residual; |
| |
| public BoundedSourceSplit(BoundedSource<T> primary, BoundedSource<T> residual) { |
| this.primary = primary; |
| this.residual = residual; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("<primary: %s; residual: %s>", primary, residual); |
| } |
| } |
| |
| public static DynamicSourceSplit toSourceSplit( |
| BoundedSourceSplit<?> sourceSplitResult, PipelineOptions options) { |
| DynamicSourceSplit sourceSplit = new DynamicSourceSplit(); |
| com.google.api.services.dataflow.model.Source primarySource; |
| com.google.api.services.dataflow.model.Source residualSource; |
| try { |
| primarySource = serializeToCloudSource(sourceSplitResult.primary, options); |
| residualSource = serializeToCloudSource(sourceSplitResult.residual, options); |
| } catch (Exception e) { |
| throw new RuntimeException("Failed to serialize one of the parts of the source split", e); |
| } |
| sourceSplit.setPrimary( |
| new DerivedSource() |
| .setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT") |
| .setSource(primarySource)); |
| sourceSplit.setResidual( |
| new DerivedSource() |
| .setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT") |
| .setSource(residualSource)); |
| return sourceSplit; |
| } |
| |
| /** |
| * Executes a protocol-level split {@code SourceOperationRequest} for bounded sources |
| * by deserializing its source to a {@code BoundedSource}, splitting it, and |
| * serializing results back. |
| */ |
| public static SourceOperationResponse performSourceOperation( |
| SourceOperationRequest request, PipelineOptions options) throws Exception { |
| SourceOperationResponse response = new SourceOperationResponse(); |
| if (request.getSplit() != null) { |
| response.setSplit(performSplit(request.getSplit(), options)); |
| } else { |
| throw new UnsupportedOperationException( |
| "Unsupported source operation request: " + request); |
| } |
| return response; |
| } |
| |
| /** |
| * Factory to create a {@link CustomSources} from a Dataflow API |
| * source specification. |
| */ |
| public static class Factory implements ReaderFactory { |
| @Override |
| public NativeReader<?> create( |
| CloudObject spec, |
| @Nullable Coder<?> coder, |
| @Nullable PipelineOptions options, |
| @Nullable ExecutionContext executionContext, |
| @Nullable CounterSet.AddCounterMutator addCounterMutator, |
| @Nullable String operationName) |
| throws Exception { |
| // The parameter "coder" is deliberately never used. It is an artifact of ReaderFactory: |
| // some readers need a coder, some don't (i.e. for some it doesn't even make sense), |
| // but ReaderFactory passes it to all readers anyway. |
| return CustomSources.create(spec, options, executionContext); |
| } |
| } |
| |
| public static NativeReader<WindowedValue<?>> create( |
| final CloudObject spec, final PipelineOptions options, ExecutionContext executionContext) |
| throws Exception { |
| |
| @SuppressWarnings("unchecked") |
| final Source<Object> source = (Source<Object>) deserializeFromCloudSource(spec); |
| |
| if (source instanceof BoundedSource) { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| NativeReader<WindowedValue<?>> reader = |
| (NativeReader) |
| new NativeReader<WindowedValue<Object>>() { |
| @Override |
| public NativeReaderIterator<WindowedValue<Object>> iterator() throws IOException { |
| return new BoundedReaderIterator<>( |
| ((BoundedSource<Object>) source).createReader(options)); |
| } |
| }; |
| return reader; |
| } else if (source instanceof UnboundedSource) { |
| @SuppressWarnings({"unchecked", "rawtypes"}) |
| NativeReader<WindowedValue<?>> reader = |
| (NativeReader) |
| new UnboundedReader<Object>( |
| options, spec, (StreamingModeExecutionContext) executionContext); |
| return reader; |
| } else { |
| throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); |
| } |
| } |
| |
| private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001"); |
| |
| public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) { |
| return splitKey.equals(firstSplitKey); |
| } |
| |
| /** |
| * {@link NativeReader} for reading from {@link UnboundedSource UnboundedSources}. |
| */ |
| private static class UnboundedReader<T> |
| extends NativeReader<WindowedValue<ValueWithRecordId<T>>> { |
| private final PipelineOptions options; |
| private final CloudObject spec; |
| private final StreamingModeExecutionContext context; |
| |
| UnboundedReader( |
| PipelineOptions options, CloudObject spec, StreamingModeExecutionContext context) { |
| this.options = options; |
| this.spec = spec; |
| this.context = context; |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() { |
| UnboundedSource.UnboundedReader<T> reader = |
| (UnboundedSource.UnboundedReader<T>) context.getCachedReader(); |
| final boolean started = reader != null; |
| if (reader == null) { |
| String key = context.getSerializedKey().toStringUtf8(); |
| // Key is expected to be a zero-padded integer representing the split index. |
| int splitIndex = Integer.parseInt(key.substring(0, 16), 16) - 1; |
| |
| UnboundedSource<T, UnboundedSource.CheckpointMark> splitSource = parseSource(splitIndex); |
| |
| UnboundedSource.CheckpointMark checkpoint = null; |
| if (splitSource.getCheckpointMarkCoder() != null) { |
| checkpoint = context.getReaderCheckpoint(splitSource.getCheckpointMarkCoder()); |
| } |
| |
| reader = splitSource.createReader(options, checkpoint); |
| } |
| |
| context.setActiveReader(reader); |
| |
| return new UnboundedReaderIterator<>(reader, started); |
| } |
| |
| @Override |
| public boolean supportsRestart() { |
| return true; |
| } |
| |
| @SuppressWarnings("unchecked") |
| private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index) { |
| List<String> serializedSplits = null; |
| try { |
| serializedSplits = getStrings(spec, SERIALIZED_SOURCE_SPLITS, null); |
| } catch (Exception e) { |
| throw new RuntimeException("Parsing serialized source splits failed: ", e); |
| } |
| Preconditions.checkArgument( |
| serializedSplits != null, "UnboundedSource object did not contain splits"); |
| Preconditions.checkArgument( |
| index < serializedSplits.size(), |
| "UnboundedSource splits contained too few splits. Requested index was %s, size was %s", |
| index, |
| serializedSplits.size()); |
| Object rawSource = deserializeFromByteArray( |
| decodeBase64(serializedSplits.get(index)), "UnboundedSource split"); |
| if (!(rawSource instanceof UnboundedSource)) { |
| throw new IllegalArgumentException("Expected UnboundedSource, got " + rawSource.getClass()); |
| } |
| return (UnboundedSource<T, UnboundedSource.CheckpointMark>) rawSource; |
| } |
| } |
| |
| private static SourceSplitResponse performSplit( |
| SourceSplitRequest request, PipelineOptions options) |
| throws Exception { |
| Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec()); |
| if (!(anySource instanceof BoundedSource)) { |
| throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource); |
| } |
| BoundedSource<?> source = (BoundedSource<?>) anySource; |
| LOG.debug("Splitting source: {}", source); |
| |
| // Produce simple independent, unsplittable bundles with no metadata attached. |
| SourceSplitResponse response = new SourceSplitResponse(); |
| response.setBundles(new ArrayList<DerivedSource>()); |
| SourceSplitOptions splitOptions = request.getOptions(); |
| Long desiredBundleSizeBytes = |
| (splitOptions == null) ? null : splitOptions.getDesiredBundleSizeBytes(); |
| if (desiredBundleSizeBytes == null) { |
| desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES; |
| } |
| List<? extends BoundedSource<?>> bundles = |
| source.splitIntoBundles(desiredBundleSizeBytes, options); |
| |
| if (bundles.size() > MAX_NUMBER_OF_SPLITS) { |
| throw new IOException( |
| String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), MAX_NUMBER_OF_SPLITS)); |
| } |
| |
| LOG.debug("Splitting produced {} bundles", bundles.size()); |
| for (BoundedSource<?> split : bundles) { |
| try { |
| split.validate(); |
| } catch (Exception e) { |
| throw new IllegalArgumentException( |
| "Splitting a valid source produced an invalid bundle. " |
| + "\nOriginal source: " |
| + source |
| + "\nInvalid bundle: " |
| + split, |
| e); |
| } |
| DerivedSource bundle = new DerivedSource(); |
| |
| com.google.api.services.dataflow.model.Source cloudSource = |
| serializeToCloudSource(split, options); |
| cloudSource.setDoesNotNeedSplitting(true); |
| |
| bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT"); |
| bundle.setSource(cloudSource); |
| response.getBundles().add(bundle); |
| } |
| response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED"); |
| return response; |
| } |
| |
| public static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception { |
| Source<?> source = (Source<?>) deserializeFromByteArray( |
| Base64.decodeBase64(getString(spec, SERIALIZED_SOURCE)), "Source"); |
| try { |
| source.validate(); |
| } catch (Exception e) { |
| LOG.error("Invalid source: " + source, e); |
| throw e; |
| } |
| return source; |
| } |
| |
| private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) { |
| if (options.getMaxNumWorkers() > 0) { |
| return options.getMaxNumWorkers(); |
| } else if (options.getNumWorkers() > 0) { |
| return options.getNumWorkers() * 3; |
| } else { |
| return 20; |
| } |
| } |
| |
| public static com.google.api.services.dataflow.model.Source serializeToCloudSource( |
| Source<?> source, PipelineOptions options) throws Exception { |
| com.google.api.services.dataflow.model.Source cloudSource = |
| new com.google.api.services.dataflow.model.Source(); |
| // We ourselves act as the SourceFormat. |
| cloudSource.setSpec(CloudObject.forClass(CustomSources.class)); |
| addString( |
| cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source))); |
| |
| SourceMetadata metadata = new SourceMetadata(); |
| if (source instanceof BoundedSource) { |
| BoundedSource<?> boundedSource = (BoundedSource<?>) source; |
| try { |
| metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options)); |
| } catch (Exception e) { |
| LOG.warn("Failed to check if the source produces sorted keys: " + source, e); |
| } |
| |
| // Size estimation is best effort so we continue even if it fails here. |
| try { |
| metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options)); |
| } catch (Exception e) { |
| LOG.warn("Size estimation of the source failed: " + source, e); |
| } |
| } else if (source instanceof UnboundedSource) { |
| UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) source; |
| metadata.setInfinite(true); |
| List<String> encodedSplits = new ArrayList<>(); |
| int desiredNumSplits = |
| getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class)); |
| for (UnboundedSource<?, ?> split : |
| unboundedSource.generateInitialSplits(desiredNumSplits, options)) { |
| encodedSplits.add(encodeBase64String(serializeToByteArray(split))); |
| } |
| Preconditions.checkArgument( |
| !encodedSplits.isEmpty(), "UnboundedSources must have at least one split"); |
| addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits); |
| } else { |
| throw new IllegalArgumentException("Unexpected source kind: " + source.getClass()); |
| } |
| |
| cloudSource.setMetadata(metadata); |
| return cloudSource; |
| } |
| |
| public static <T> void evaluateReadHelper( |
| Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) { |
| try { |
| List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>(); |
| BoundedSource<T> source = transform.getSource(); |
| try (BoundedSource.BoundedReader<T> reader = |
| source.createReader(context.getPipelineOptions())) { |
| for (boolean available = reader.start(); available; available = reader.advance()) { |
| output.add( |
| DirectPipelineRunner.ValueWithMetadata.of( |
| WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp()))); |
| } |
| } |
| context.setPCollectionValuesWithMetadata(context.getOutput(transform), output); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| public static <T> void translateReadHelper(Source<T> source, |
| PTransform<?, ? extends PValue> transform, |
| DataflowPipelineTranslator.TranslationContext context) { |
| try { |
| context.addStep(transform, "ParallelRead"); |
| context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT); |
| context.addInput( |
| PropertyNames.SOURCE_STEP_INPUT, |
| cloudSourceToDictionary(serializeToCloudSource(source, context.getPipelineOptions()))); |
| context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| private static class BoundedReaderIterator<T> |
| extends NativeReader.NativeReaderIterator<WindowedValue<T>> { |
| private BoundedSource.BoundedReader<T> reader; |
| |
| private BoundedReaderIterator(BoundedSource.BoundedReader<T> reader) { |
| this.reader = reader; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| try { |
| return reader.start(); |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to start reading from source: " + reader.getCurrentSource(), e); |
| } |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| try { |
| return reader.advance(); |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to advance reader of source: " + reader.getCurrentSource(), e); |
| } |
| } |
| |
| @Override |
| public WindowedValue<T> getCurrent() throws NoSuchElementException { |
| return WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp()); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| reader.close(); |
| } |
| |
| @Override |
| public NativeReader.Progress getProgress() { |
| if (reader instanceof BoundedSource.BoundedReader) { |
| ApproximateReportedProgress progress = new ApproximateReportedProgress(); |
| Double fractionConsumed = reader.getFractionConsumed(); |
| if (fractionConsumed != null) { |
| progress.setFractionConsumed(fractionConsumed); |
| } |
| return SourceTranslationUtils.cloudProgressToReaderProgress(progress); |
| } else { |
| // Progress estimation for unbounded sources not yet supported. |
| return null; |
| } |
| } |
| |
| @Override |
| public NativeReader.DynamicSplitResult requestDynamicSplit( |
| NativeReader.DynamicSplitRequest request) { |
| ApproximateSplitRequest stopPosition = |
| SourceTranslationUtils.splitRequestToApproximateSplitRequest(request); |
| Double fractionConsumed = stopPosition.getFractionConsumed(); |
| if (fractionConsumed == null) { |
| // Only truncating at a fraction is currently supported. |
| return null; |
| } |
| BoundedSource<T> original = reader.getCurrentSource(); |
| BoundedSource<T> residual = reader.splitAtFraction(fractionConsumed); |
| if (residual == null) { |
| return null; |
| } |
| // Try to catch some potential subclass implementation errors early. |
| BoundedSource<T> primary = reader.getCurrentSource(); |
| if (original == primary) { |
| throw new IllegalStateException( |
| "Successful split did not change the current source: primary is identical to original" |
| + " (Source objects MUST be immutable): " + primary); |
| } |
| if (original == residual) { |
| throw new IllegalStateException( |
| "Successful split did not change the current source: residual is identical to original" |
| + " (Source objects MUST be immutable): " + residual); |
| } |
| try { |
| primary.validate(); |
| } catch (Exception e) { |
| throw new IllegalStateException( |
| "Successful split produced an illegal primary source. " |
| + "\nOriginal: " + original + "\nPrimary: " + primary + "\nResidual: " + residual); |
| } |
| try { |
| residual.validate(); |
| } catch (Exception e) { |
| throw new IllegalStateException( |
| "Successful split produced an illegal residual source. " |
| + "\nOriginal: " + original + "\nPrimary: " + primary + "\nResidual: " + residual); |
| } |
| return new BoundedSourceSplit<T>(primary, residual); |
| } |
| |
| @Override |
| public double getRemainingParallelism() { |
| return Double.NaN; |
| } |
| } |
| |
| // Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing |
| // smoothly, and ensures that not too much work will have to be reprocessed in the event of |
| // a crash. |
| @VisibleForTesting |
| static final int MAX_UNBOUNDED_BUNDLE_SIZE = 10000; |
| @VisibleForTesting |
| static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10); |
| |
| private static class UnboundedReaderIterator<T> |
| extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> { |
| private final UnboundedSource.UnboundedReader<T> reader; |
| private final boolean started; |
| private final Instant endTime; |
| private int elemsRead; |
| |
| private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) { |
| this.reader = reader; |
| this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME); |
| this.elemsRead = 0; |
| this.started = started; |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| if (started) { |
| // This is a reader that has been restored from the unbounded reader cache. |
| // It has already been started, so this call to start() should delegate |
| // to advance() instead. |
| return advance(); |
| } |
| try { |
| if (!reader.start()) { |
| return false; |
| } |
| } catch (Exception e) { |
| throw new IOException( |
| "Failed to start reading from source: " + reader.getCurrentSource(), e); |
| } |
| elemsRead++; |
| return true; |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| if (elemsRead >= MAX_UNBOUNDED_BUNDLE_SIZE |
| || Instant.now().isAfter(endTime)) { |
| return false; |
| } |
| |
| // Backoff starting at 100ms, for approximately 1s total. 100+150+225+337.5~=1000. |
| BackOff backoff = new AttemptBoundedExponentialBackOff(5, 100); |
| while (true) { |
| try { |
| if (reader.advance()) { |
| elemsRead++; |
| return true; |
| } |
| } catch (Exception e) { |
| throw new IOException("Failed to advance source: " + reader.getCurrentSource(), e); |
| } |
| long nextBackoff = backoff.nextBackOffMillis(); |
| if (nextBackoff == BackOff.STOP) { |
| return false; |
| } |
| try { |
| Thread.sleep(nextBackoff); |
| } catch (InterruptedException e) { |
| // ignore. |
| } |
| } |
| } |
| |
| @Override |
| public WindowedValue<ValueWithRecordId<T>> getCurrent() throws NoSuchElementException { |
| WindowedValue<T> result = |
| WindowedValue.timestampedValueInGlobalWindow( |
| reader.getCurrent(), reader.getCurrentTimestamp()); |
| return result.withValue( |
| new ValueWithRecordId<>(result.getValue(), reader.getCurrentRecordId())); |
| } |
| |
| @Override |
| public void close() {} |
| |
| @Override |
| public NativeReader.Progress getProgress() { |
| return null; |
| } |
| |
| @Override |
| public NativeReader.DynamicSplitResult requestDynamicSplit( |
| NativeReader.DynamicSplitRequest request) { |
| return null; |
| } |
| |
| @Override |
| public double getRemainingParallelism() { |
| return Double.NaN; |
| } |
| } |
| } |