blob: f159f66a85a2ce272ee376404e209757445a189b [file] [log] [blame]
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.sdk.runners.dataflow;
import static com.google.api.client.util.Base64.decodeBase64;
import static com.google.api.client.util.Base64.encodeBase64String;
import static com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils.cloudSourceToDictionary;
import static com.google.cloud.dataflow.sdk.util.SerializableUtils.deserializeFromByteArray;
import static com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray;
import static com.google.cloud.dataflow.sdk.util.Structs.addString;
import static com.google.cloud.dataflow.sdk.util.Structs.addStringList;
import static com.google.cloud.dataflow.sdk.util.Structs.getString;
import static com.google.cloud.dataflow.sdk.util.Structs.getStrings;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Base64;
import com.google.api.services.dataflow.model.ApproximateReportedProgress;
import com.google.api.services.dataflow.model.ApproximateSplitRequest;
import com.google.api.services.dataflow.model.DerivedSource;
import com.google.api.services.dataflow.model.DynamicSourceSplit;
import com.google.api.services.dataflow.model.SourceMetadata;
import com.google.api.services.dataflow.model.SourceOperationRequest;
import com.google.api.services.dataflow.model.SourceOperationResponse;
import com.google.api.services.dataflow.model.SourceSplitOptions;
import com.google.api.services.dataflow.model.SourceSplitRequest;
import com.google.api.services.dataflow.model.SourceSplitResponse;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.io.BoundedSource;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.io.Source;
import com.google.cloud.dataflow.sdk.io.UnboundedSource;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineTranslator;
import com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner;
import com.google.cloud.dataflow.sdk.runners.worker.ReaderFactory;
import com.google.cloud.dataflow.sdk.runners.worker.SourceTranslationUtils;
import com.google.cloud.dataflow.sdk.runners.worker.StreamingModeExecutionContext;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.util.AttemptBoundedExponentialBackOff;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.PropertyNames;
import com.google.cloud.dataflow.sdk.util.ValueWithRecordId;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.CounterSet;
import com.google.cloud.dataflow.sdk.util.common.worker.NativeReader;
import com.google.cloud.dataflow.sdk.values.PValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import javax.annotation.Nullable;
/**
* A helper class for supporting sources defined as {@code Source}.
*
* <p>Provides a bridge between the high-level {@code Source} API and the
* low-level {@code CloudSource} class.
*/
public class CustomSources {
private static final String SERIALIZED_SOURCE = "serialized_source";
@VisibleForTesting static final String SERIALIZED_SOURCE_SPLITS = "serialized_source_splits";
private static final long DEFAULT_DESIRED_BUNDLE_SIZE_BYTES = 64 * (1 << 20);
public static final String TOO_MANY_SOURCE_SPLITS_ERROR =
"Total number of Source objects generated by splitIntoBundles() operation, %d, is"
+ " larger than the allowable limit, %d. For more information, please check the corresponding"
+ " FAQ entry at:\n"
+ "https://cloud.google.com/dataflow/faq";
// Maximum number of custom source splits currently supported by Dataflow.
private static final int MAX_NUMBER_OF_SPLITS = 16000;
private static final Logger LOG = LoggerFactory.getLogger(CustomSources.class);
/**
* A {@code DynamicSplitResult} specified explicitly by a pair of {@code BoundedSource}
* objects describing the primary and residual sources.
*/
public static final class BoundedSourceSplit<T> implements NativeReader.DynamicSplitResult {
public final BoundedSource<T> primary;
public final BoundedSource<T> residual;
public BoundedSourceSplit(BoundedSource<T> primary, BoundedSource<T> residual) {
this.primary = primary;
this.residual = residual;
}
@Override
public String toString() {
return String.format("<primary: %s; residual: %s>", primary, residual);
}
}
public static DynamicSourceSplit toSourceSplit(
BoundedSourceSplit<?> sourceSplitResult, PipelineOptions options) {
DynamicSourceSplit sourceSplit = new DynamicSourceSplit();
com.google.api.services.dataflow.model.Source primarySource;
com.google.api.services.dataflow.model.Source residualSource;
try {
primarySource = serializeToCloudSource(sourceSplitResult.primary, options);
residualSource = serializeToCloudSource(sourceSplitResult.residual, options);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize one of the parts of the source split", e);
}
sourceSplit.setPrimary(
new DerivedSource()
.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT")
.setSource(primarySource));
sourceSplit.setResidual(
new DerivedSource()
.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT")
.setSource(residualSource));
return sourceSplit;
}
/**
* Executes a protocol-level split {@code SourceOperationRequest} for bounded sources
* by deserializing its source to a {@code BoundedSource}, splitting it, and
* serializing results back.
*/
public static SourceOperationResponse performSourceOperation(
SourceOperationRequest request, PipelineOptions options) throws Exception {
SourceOperationResponse response = new SourceOperationResponse();
if (request.getSplit() != null) {
response.setSplit(performSplit(request.getSplit(), options));
} else {
throw new UnsupportedOperationException(
"Unsupported source operation request: " + request);
}
return response;
}
/**
* Factory to create a {@link CustomSources} from a Dataflow API
* source specification.
*/
public static class Factory implements ReaderFactory {
@Override
public NativeReader<?> create(
CloudObject spec,
@Nullable Coder<?> coder,
@Nullable PipelineOptions options,
@Nullable ExecutionContext executionContext,
@Nullable CounterSet.AddCounterMutator addCounterMutator,
@Nullable String operationName)
throws Exception {
// The parameter "coder" is deliberately never used. It is an artifact of ReaderFactory:
// some readers need a coder, some don't (i.e. for some it doesn't even make sense),
// but ReaderFactory passes it to all readers anyway.
return CustomSources.create(spec, options, executionContext);
}
}
public static NativeReader<WindowedValue<?>> create(
final CloudObject spec, final PipelineOptions options, ExecutionContext executionContext)
throws Exception {
@SuppressWarnings("unchecked")
final Source<Object> source = (Source<Object>) deserializeFromCloudSource(spec);
if (source instanceof BoundedSource) {
@SuppressWarnings({"unchecked", "rawtypes"})
NativeReader<WindowedValue<?>> reader =
(NativeReader)
new NativeReader<WindowedValue<Object>>() {
@Override
public NativeReaderIterator<WindowedValue<Object>> iterator() throws IOException {
return new BoundedReaderIterator<>(
((BoundedSource<Object>) source).createReader(options));
}
};
return reader;
} else if (source instanceof UnboundedSource) {
@SuppressWarnings({"unchecked", "rawtypes"})
NativeReader<WindowedValue<?>> reader =
(NativeReader)
new UnboundedReader<Object>(
options, spec, (StreamingModeExecutionContext) executionContext);
return reader;
} else {
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
}
}
private static final ByteString firstSplitKey = ByteString.copyFromUtf8("0000000000000001");
public static boolean isFirstUnboundedSourceSplit(ByteString splitKey) {
return splitKey.equals(firstSplitKey);
}
/**
* {@link NativeReader} for reading from {@link UnboundedSource UnboundedSources}.
*/
private static class UnboundedReader<T>
extends NativeReader<WindowedValue<ValueWithRecordId<T>>> {
private final PipelineOptions options;
private final CloudObject spec;
private final StreamingModeExecutionContext context;
UnboundedReader(
PipelineOptions options, CloudObject spec, StreamingModeExecutionContext context) {
this.options = options;
this.spec = spec;
this.context = context;
}
@Override
@SuppressWarnings("unchecked")
public NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> iterator() {
UnboundedSource.UnboundedReader<T> reader =
(UnboundedSource.UnboundedReader<T>) context.getCachedReader();
final boolean started = reader != null;
if (reader == null) {
String key = context.getSerializedKey().toStringUtf8();
// Key is expected to be a zero-padded integer representing the split index.
int splitIndex = Integer.parseInt(key.substring(0, 16), 16) - 1;
UnboundedSource<T, UnboundedSource.CheckpointMark> splitSource = parseSource(splitIndex);
UnboundedSource.CheckpointMark checkpoint = null;
if (splitSource.getCheckpointMarkCoder() != null) {
checkpoint = context.getReaderCheckpoint(splitSource.getCheckpointMarkCoder());
}
reader = splitSource.createReader(options, checkpoint);
}
context.setActiveReader(reader);
return new UnboundedReaderIterator<>(reader, started);
}
@Override
public boolean supportsRestart() {
return true;
}
@SuppressWarnings("unchecked")
private UnboundedSource<T, UnboundedSource.CheckpointMark> parseSource(int index) {
List<String> serializedSplits = null;
try {
serializedSplits = getStrings(spec, SERIALIZED_SOURCE_SPLITS, null);
} catch (Exception e) {
throw new RuntimeException("Parsing serialized source splits failed: ", e);
}
Preconditions.checkArgument(
serializedSplits != null, "UnboundedSource object did not contain splits");
Preconditions.checkArgument(
index < serializedSplits.size(),
"UnboundedSource splits contained too few splits. Requested index was %s, size was %s",
index,
serializedSplits.size());
Object rawSource = deserializeFromByteArray(
decodeBase64(serializedSplits.get(index)), "UnboundedSource split");
if (!(rawSource instanceof UnboundedSource)) {
throw new IllegalArgumentException("Expected UnboundedSource, got " + rawSource.getClass());
}
return (UnboundedSource<T, UnboundedSource.CheckpointMark>) rawSource;
}
}
private static SourceSplitResponse performSplit(
SourceSplitRequest request, PipelineOptions options)
throws Exception {
Source<?> anySource = deserializeFromCloudSource(request.getSource().getSpec());
if (!(anySource instanceof BoundedSource)) {
throw new UnsupportedOperationException("Cannot split a non-Bounded source: " + anySource);
}
BoundedSource<?> source = (BoundedSource<?>) anySource;
LOG.debug("Splitting source: {}", source);
// Produce simple independent, unsplittable bundles with no metadata attached.
SourceSplitResponse response = new SourceSplitResponse();
response.setBundles(new ArrayList<DerivedSource>());
SourceSplitOptions splitOptions = request.getOptions();
Long desiredBundleSizeBytes =
(splitOptions == null) ? null : splitOptions.getDesiredBundleSizeBytes();
if (desiredBundleSizeBytes == null) {
desiredBundleSizeBytes = DEFAULT_DESIRED_BUNDLE_SIZE_BYTES;
}
List<? extends BoundedSource<?>> bundles =
source.splitIntoBundles(desiredBundleSizeBytes, options);
if (bundles.size() > MAX_NUMBER_OF_SPLITS) {
throw new IOException(
String.format(TOO_MANY_SOURCE_SPLITS_ERROR, bundles.size(), MAX_NUMBER_OF_SPLITS));
}
LOG.debug("Splitting produced {} bundles", bundles.size());
for (BoundedSource<?> split : bundles) {
try {
split.validate();
} catch (Exception e) {
throw new IllegalArgumentException(
"Splitting a valid source produced an invalid bundle. "
+ "\nOriginal source: "
+ source
+ "\nInvalid bundle: "
+ split,
e);
}
DerivedSource bundle = new DerivedSource();
com.google.api.services.dataflow.model.Source cloudSource =
serializeToCloudSource(split, options);
cloudSource.setDoesNotNeedSplitting(true);
bundle.setDerivationMode("SOURCE_DERIVATION_MODE_INDEPENDENT");
bundle.setSource(cloudSource);
response.getBundles().add(bundle);
}
response.setOutcome("SOURCE_SPLIT_OUTCOME_SPLITTING_HAPPENED");
return response;
}
public static Source<?> deserializeFromCloudSource(Map<String, Object> spec) throws Exception {
Source<?> source = (Source<?>) deserializeFromByteArray(
Base64.decodeBase64(getString(spec, SERIALIZED_SOURCE)), "Source");
try {
source.validate();
} catch (Exception e) {
LOG.error("Invalid source: " + source, e);
throw e;
}
return source;
}
private static int getDesiredNumUnboundedSourceSplits(DataflowPipelineOptions options) {
if (options.getMaxNumWorkers() > 0) {
return options.getMaxNumWorkers();
} else if (options.getNumWorkers() > 0) {
return options.getNumWorkers() * 3;
} else {
return 20;
}
}
public static com.google.api.services.dataflow.model.Source serializeToCloudSource(
Source<?> source, PipelineOptions options) throws Exception {
com.google.api.services.dataflow.model.Source cloudSource =
new com.google.api.services.dataflow.model.Source();
// We ourselves act as the SourceFormat.
cloudSource.setSpec(CloudObject.forClass(CustomSources.class));
addString(
cloudSource.getSpec(), SERIALIZED_SOURCE, encodeBase64String(serializeToByteArray(source)));
SourceMetadata metadata = new SourceMetadata();
if (source instanceof BoundedSource) {
BoundedSource<?> boundedSource = (BoundedSource<?>) source;
try {
metadata.setProducesSortedKeys(boundedSource.producesSortedKeys(options));
} catch (Exception e) {
LOG.warn("Failed to check if the source produces sorted keys: " + source, e);
}
// Size estimation is best effort so we continue even if it fails here.
try {
metadata.setEstimatedSizeBytes(boundedSource.getEstimatedSizeBytes(options));
} catch (Exception e) {
LOG.warn("Size estimation of the source failed: " + source, e);
}
} else if (source instanceof UnboundedSource) {
UnboundedSource<?, ?> unboundedSource = (UnboundedSource<?, ?>) source;
metadata.setInfinite(true);
List<String> encodedSplits = new ArrayList<>();
int desiredNumSplits =
getDesiredNumUnboundedSourceSplits(options.as(DataflowPipelineOptions.class));
for (UnboundedSource<?, ?> split :
unboundedSource.generateInitialSplits(desiredNumSplits, options)) {
encodedSplits.add(encodeBase64String(serializeToByteArray(split)));
}
Preconditions.checkArgument(
!encodedSplits.isEmpty(), "UnboundedSources must have at least one split");
addStringList(cloudSource.getSpec(), SERIALIZED_SOURCE_SPLITS, encodedSplits);
} else {
throw new IllegalArgumentException("Unexpected source kind: " + source.getClass());
}
cloudSource.setMetadata(metadata);
return cloudSource;
}
public static <T> void evaluateReadHelper(
Read.Bounded<T> transform, DirectPipelineRunner.EvaluationContext context) {
try {
List<DirectPipelineRunner.ValueWithMetadata<T>> output = new ArrayList<>();
BoundedSource<T> source = transform.getSource();
try (BoundedSource.BoundedReader<T> reader =
source.createReader(context.getPipelineOptions())) {
for (boolean available = reader.start(); available; available = reader.advance()) {
output.add(
DirectPipelineRunner.ValueWithMetadata.of(
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp())));
}
}
context.setPCollectionValuesWithMetadata(context.getOutput(transform), output);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static <T> void translateReadHelper(Source<T> source,
PTransform<?, ? extends PValue> transform,
DataflowPipelineTranslator.TranslationContext context) {
try {
context.addStep(transform, "ParallelRead");
context.addInput(PropertyNames.FORMAT, PropertyNames.CUSTOM_SOURCE_FORMAT);
context.addInput(
PropertyNames.SOURCE_STEP_INPUT,
cloudSourceToDictionary(serializeToCloudSource(source, context.getPipelineOptions())));
context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static class BoundedReaderIterator<T>
extends NativeReader.NativeReaderIterator<WindowedValue<T>> {
private BoundedSource.BoundedReader<T> reader;
private BoundedReaderIterator(BoundedSource.BoundedReader<T> reader) {
this.reader = reader;
}
@Override
public boolean start() throws IOException {
try {
return reader.start();
} catch (Exception e) {
throw new IOException(
"Failed to start reading from source: " + reader.getCurrentSource(), e);
}
}
@Override
public boolean advance() throws IOException {
try {
return reader.advance();
} catch (Exception e) {
throw new IOException(
"Failed to advance reader of source: " + reader.getCurrentSource(), e);
}
}
@Override
public WindowedValue<T> getCurrent() throws NoSuchElementException {
return WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp());
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public NativeReader.Progress getProgress() {
if (reader instanceof BoundedSource.BoundedReader) {
ApproximateReportedProgress progress = new ApproximateReportedProgress();
Double fractionConsumed = reader.getFractionConsumed();
if (fractionConsumed != null) {
progress.setFractionConsumed(fractionConsumed);
}
return SourceTranslationUtils.cloudProgressToReaderProgress(progress);
} else {
// Progress estimation for unbounded sources not yet supported.
return null;
}
}
@Override
public NativeReader.DynamicSplitResult requestDynamicSplit(
NativeReader.DynamicSplitRequest request) {
ApproximateSplitRequest stopPosition =
SourceTranslationUtils.splitRequestToApproximateSplitRequest(request);
Double fractionConsumed = stopPosition.getFractionConsumed();
if (fractionConsumed == null) {
// Only truncating at a fraction is currently supported.
return null;
}
BoundedSource<T> original = reader.getCurrentSource();
BoundedSource<T> residual = reader.splitAtFraction(fractionConsumed);
if (residual == null) {
return null;
}
// Try to catch some potential subclass implementation errors early.
BoundedSource<T> primary = reader.getCurrentSource();
if (original == primary) {
throw new IllegalStateException(
"Successful split did not change the current source: primary is identical to original"
+ " (Source objects MUST be immutable): " + primary);
}
if (original == residual) {
throw new IllegalStateException(
"Successful split did not change the current source: residual is identical to original"
+ " (Source objects MUST be immutable): " + residual);
}
try {
primary.validate();
} catch (Exception e) {
throw new IllegalStateException(
"Successful split produced an illegal primary source. "
+ "\nOriginal: " + original + "\nPrimary: " + primary + "\nResidual: " + residual);
}
try {
residual.validate();
} catch (Exception e) {
throw new IllegalStateException(
"Successful split produced an illegal residual source. "
+ "\nOriginal: " + original + "\nPrimary: " + primary + "\nResidual: " + residual);
}
return new BoundedSourceSplit<T>(primary, residual);
}
@Override
public double getRemainingParallelism() {
return Double.NaN;
}
}
// Commit at least once every 10 seconds or 10k records. This keeps the watermark advancing
// smoothly, and ensures that not too much work will have to be reprocessed in the event of
// a crash.
@VisibleForTesting
static final int MAX_UNBOUNDED_BUNDLE_SIZE = 10000;
@VisibleForTesting
static final Duration MAX_UNBOUNDED_BUNDLE_READ_TIME = Duration.standardSeconds(10);
private static class UnboundedReaderIterator<T>
extends NativeReader.NativeReaderIterator<WindowedValue<ValueWithRecordId<T>>> {
private final UnboundedSource.UnboundedReader<T> reader;
private final boolean started;
private final Instant endTime;
private int elemsRead;
private UnboundedReaderIterator(UnboundedSource.UnboundedReader<T> reader, boolean started) {
this.reader = reader;
this.endTime = Instant.now().plus(MAX_UNBOUNDED_BUNDLE_READ_TIME);
this.elemsRead = 0;
this.started = started;
}
@Override
public boolean start() throws IOException {
if (started) {
// This is a reader that has been restored from the unbounded reader cache.
// It has already been started, so this call to start() should delegate
// to advance() instead.
return advance();
}
try {
if (!reader.start()) {
return false;
}
} catch (Exception e) {
throw new IOException(
"Failed to start reading from source: " + reader.getCurrentSource(), e);
}
elemsRead++;
return true;
}
@Override
public boolean advance() throws IOException {
if (elemsRead >= MAX_UNBOUNDED_BUNDLE_SIZE
|| Instant.now().isAfter(endTime)) {
return false;
}
// Backoff starting at 100ms, for approximately 1s total. 100+150+225+337.5~=1000.
BackOff backoff = new AttemptBoundedExponentialBackOff(5, 100);
while (true) {
try {
if (reader.advance()) {
elemsRead++;
return true;
}
} catch (Exception e) {
throw new IOException("Failed to advance source: " + reader.getCurrentSource(), e);
}
long nextBackoff = backoff.nextBackOffMillis();
if (nextBackoff == BackOff.STOP) {
return false;
}
try {
Thread.sleep(nextBackoff);
} catch (InterruptedException e) {
// ignore.
}
}
}
@Override
public WindowedValue<ValueWithRecordId<T>> getCurrent() throws NoSuchElementException {
WindowedValue<T> result =
WindowedValue.timestampedValueInGlobalWindow(
reader.getCurrent(), reader.getCurrentTimestamp());
return result.withValue(
new ValueWithRecordId<>(result.getValue(), reader.getCurrentRecordId()));
}
@Override
public void close() {}
@Override
public NativeReader.Progress getProgress() {
return null;
}
@Override
public NativeReader.DynamicSplitResult requestDynamicSplit(
NativeReader.DynamicSplitRequest request) {
return null;
}
@Override
public double getRemainingParallelism() {
return Double.NaN;
}
}
}