blob: 2d56516ffed117558f140b0f36d35008dae7dcaa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileBasedSink.DynamicDestinations;
import org.apache.beam.sdk.io.FileBasedSink.FileResult;
import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
import org.apache.beam.sdk.io.FileBasedSink.Writer;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reify;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PCollectionViews;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Objects;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A {@link PTransform} that writes to a {@link FileBasedSink}. A write begins with a sequential
* global initialization of a sink, followed by a parallel write, and ends with a sequential
* finalization of the write. The output of a write is {@link PDone}.
*
* <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link
* WriteOperation}, so the number of output will vary based on runner behavior, though at least 1
* output will always be produced. The exact parallelism of the write stage can be controlled using
* {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to
* globally limit the number of workers connecting to an external service. However, this option can
* often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
*
* <p>Example usage with runner-determined sharding:
*
* <pre>{@code p.apply(WriteFiles.to(new MySink(...)));}</pre>
*
* <p>Example usage with a fixed number of shards:
*
* <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre>
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
public abstract class WriteFiles<UserT, DestinationT, OutputT>
extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> {
private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class);
/** For internal use by runners. */
@Internal
public static final Class<? extends WriteFiles> CONCRETE_CLASS = AutoValue_WriteFiles.class;
// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing per-window files.
// The first 20 files from a single WriteFiles transform will write files inline in the
// transform. Anything beyond that might be shuffled.
// Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
// their own policy.
private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
// When we spill records, shard the output keys to prevent hotspots.
// We could consider making this a parameter.
private static final int SPILLED_RECORD_SHARDING_FACTOR = 10;
static final int UNKNOWN_SHARDNUM = -1;
private @Nullable WriteOperation<DestinationT, OutputT> writeOperation;
/**
* Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting
* the runner control how many different shards are produced.
*/
public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to(
FileBasedSink<UserT, DestinationT, OutputT> sink) {
checkArgument(sink != null, "sink can not be null");
return new AutoValue_WriteFiles.Builder<UserT, DestinationT, OutputT>()
.setSink(sink)
.setComputeNumShards(null)
.setNumShardsProvider(null)
.setWindowedWrites(false)
.setMaxNumWritersPerBundle(DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE)
.setSideInputs(sink.getDynamicDestinations().getSideInputs())
.build();
}
public abstract FileBasedSink<UserT, DestinationT, OutputT> getSink();
@Nullable
public abstract PTransform<PCollection<UserT>, PCollectionView<Integer>> getComputeNumShards();
// We don't use a side input for static sharding, as we want this value to be updatable
// when a pipeline is updated.
@Nullable
public abstract ValueProvider<Integer> getNumShardsProvider();
public abstract boolean getWindowedWrites();
abstract int getMaxNumWritersPerBundle();
abstract List<PCollectionView<?>> getSideInputs();
@Nullable
public abstract ShardingFunction<UserT, DestinationT> getShardingFunction();
abstract Builder<UserT, DestinationT, OutputT> toBuilder();
@AutoValue.Builder
abstract static class Builder<UserT, DestinationT, OutputT> {
abstract Builder<UserT, DestinationT, OutputT> setSink(
FileBasedSink<UserT, DestinationT, OutputT> sink);
abstract Builder<UserT, DestinationT, OutputT> setComputeNumShards(
@Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards);
abstract Builder<UserT, DestinationT, OutputT> setNumShardsProvider(
@Nullable ValueProvider<Integer> numShardsProvider);
abstract Builder<UserT, DestinationT, OutputT> setWindowedWrites(boolean windowedWrites);
abstract Builder<UserT, DestinationT, OutputT> setMaxNumWritersPerBundle(
int maxNumWritersPerBundle);
abstract Builder<UserT, DestinationT, OutputT> setSideInputs(
List<PCollectionView<?>> sideInputs);
abstract Builder<UserT, DestinationT, OutputT> setShardingFunction(
@Nullable ShardingFunction<UserT, DestinationT> shardingFunction);
abstract WriteFiles<UserT, DestinationT, OutputT> build();
}
@Override
public Map<TupleTag<?>, PValue> getAdditionalInputs() {
return PCollectionViews.toAdditionalInputs(getSideInputs());
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the
* specified number of shards.
*
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*
* <p>A value less than or equal to 0 will be equivalent to the default behavior of
* runner-determined sharding.
*/
public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) {
if (numShards > 0) {
return withNumShards(StaticValueProvider.of(numShards));
}
return withRunnerDeterminedSharding();
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the
* {@link ValueProvider} specified number of shards.
*
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
public WriteFiles<UserT, DestinationT, OutputT> withNumShards(
ValueProvider<Integer> numShardsProvider) {
return toBuilder().setNumShardsProvider(numShardsProvider).build();
}
/** Set the maximum number of writers created in a bundle before spilling to shuffle. */
public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle(
int maxNumWritersPerBundle) {
return toBuilder().setMaxNumWritersPerBundle(maxNumWritersPerBundle).build();
}
public WriteFiles<UserT, DestinationT, OutputT> withSideInputs(
List<PCollectionView<?>> sideInputs) {
return toBuilder().setSideInputs(sideInputs).build();
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the
* specified {@link PTransform} to compute the number of shards.
*
* <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for
* more information.
*/
public WriteFiles<UserT, DestinationT, OutputT> withSharding(
PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) {
checkArgument(
sharding != null, "sharding can not be null. Use withRunnerDeterminedSharding() instead.");
return toBuilder().setComputeNumShards(sharding).build();
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with
* runner-determined sharding.
*/
public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() {
return toBuilder().setComputeNumShards(null).setNumShardsProvider(null).build();
}
/**
* Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the
* specified sharding function to assign shard for inputs.
*/
public WriteFiles<UserT, DestinationT, OutputT> withShardingFunction(
ShardingFunction<UserT, DestinationT> shardingFunction) {
return toBuilder().setShardingFunction(shardingFunction).build();
}
/**
* Returns a new {@link WriteFiles} that writes preserves windowing on it's input.
*
* <p>If this option is not specified, windowing and triggering are replaced by {@link
* GlobalWindows} and {@link DefaultTrigger}.
*
* <p>If there is no data for a window, no output shards will be generated for that window. If a
* window triggers multiple times, then more than a single output shard might be generated
* multiple times; it's up to the sink implementation to keep these output shards unique.
*
* <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value.
*/
public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() {
return toBuilder().setWindowedWrites(true).build();
}
/**
* Returns a new {@link WriteFiles} that writes all data without spilling, simplifying the
* pipeline. This option should not be used with {@link #withMaxNumWritersPerBundle(int)} and it
* will eliminate this limit possibly causing many writers to be opened. Use with caution.
*
* <p>This option only applies to writes {@link #withRunnerDeterminedSharding()}.
*/
public WriteFiles<UserT, DestinationT, OutputT> withNoSpilling() {
return toBuilder().setMaxNumWritersPerBundle(-1).build();
}
@Override
public void validate(PipelineOptions options) {
getSink().validate(options);
}
@Override
public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) {
if (input.isBounded() == IsBounded.UNBOUNDED) {
checkArgument(
getWindowedWrites(),
"Must use windowed writes when applying %s to an unbounded PCollection",
WriteFiles.class.getSimpleName());
// The reason for this is https://issues.apache.org/jira/browse/BEAM-1438
// and similar behavior in other runners.
checkArgument(
getComputeNumShards() != null || getNumShardsProvider() != null,
"When applying %s to an unbounded PCollection, "
+ "must specify number of output shards explicitly",
WriteFiles.class.getSimpleName());
}
this.writeOperation = getSink().createWriteOperation();
this.writeOperation.setWindowedWrites(getWindowedWrites());
if (!getWindowedWrites()) {
// Re-window the data into the global window and remove any existing triggers.
input =
input.apply(
"RewindowIntoGlobal",
Window.<UserT>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
}
Coder<DestinationT> destinationCoder;
try {
destinationCoder =
getDynamicDestinations()
.getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry());
destinationCoder.verifyDeterministic();
} catch (CannotProvideCoderException | NonDeterministicException e) {
throw new RuntimeException(e);
}
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder();
FileResultCoder<DestinationT> fileResultCoder =
FileResultCoder.of(windowCoder, destinationCoder);
PCollectionView<Integer> numShardsView =
(getComputeNumShards() == null) ? null : input.apply(getComputeNumShards());
PCollection<FileResult<DestinationT>> tempFileResults =
(getComputeNumShards() == null && getNumShardsProvider() == null)
? input.apply(
"WriteUnshardedBundlesToTempFiles",
new WriteUnshardedBundlesToTempFiles(destinationCoder, fileResultCoder))
: input.apply(
"WriteShardedBundlesToTempFiles",
new WriteShardedBundlesToTempFiles(
destinationCoder, fileResultCoder, numShardsView));
return tempFileResults
.apply("GatherTempFileResults", new GatherResults<>(fileResultCoder))
.apply(
"FinalizeTempFileBundles",
new FinalizeTempFileBundles(numShardsView, destinationCoder));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("sink", getSink().getClass()).withLabel("WriteFiles Sink"))
.include("sink", getSink());
if (getComputeNumShards() != null) {
builder.include("sharding", getComputeNumShards());
} else {
builder.addIfNotNull(
DisplayData.item("numShards", getNumShardsProvider())
.withLabel("Fixed Number of Shards"));
}
}
private DynamicDestinations<UserT, DestinationT, OutputT> getDynamicDestinations() {
return (DynamicDestinations<UserT, DestinationT, OutputT>)
writeOperation.getSink().getDynamicDestinations();
}
private class GatherResults<ResultT>
extends PTransform<PCollection<ResultT>, PCollection<List<ResultT>>> {
private final Coder<ResultT> resultCoder;
private GatherResults(Coder<ResultT> resultCoder) {
this.resultCoder = resultCoder;
}
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
if (getWindowedWrites()) {
// Reshuffle the results to make them stable against retries.
// Use a single void key to maximize size of bundles for finalization.
return input
.apply("Add void key", WithKeys.of((Void) null))
.apply("Reshuffle", Reshuffle.of())
.apply("Drop key", Values.create())
.apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<>()))
.setCoder(ListCoder.of(resultCoder))
// Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
.apply(Reshuffle.viaRandomKey());
} else {
// Pass results via a side input rather than reshuffle, because we need to get an empty
// iterable to finalize if there are no results.
return input
.getPipeline()
.apply(Reify.viewInGlobalWindow(input.apply(View.asList()), ListCoder.of(resultCoder)));
}
}
}
private class WriteUnshardedBundlesToTempFiles
extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
private final Coder<DestinationT> destinationCoder;
private final Coder<FileResult<DestinationT>> fileResultCoder;
private WriteUnshardedBundlesToTempFiles(
Coder<DestinationT> destinationCoder, Coder<FileResult<DestinationT>> fileResultCoder) {
this.destinationCoder = destinationCoder;
this.fileResultCoder = fileResultCoder;
}
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
if (getMaxNumWritersPerBundle() < 0) {
return input
.apply(
"WritedUnshardedBundles",
ParDo.of(new WriteUnshardedTempFilesFn(null, destinationCoder))
.withSideInputs(getSideInputs()))
.setCoder(fileResultCoder);
}
TupleTag<FileResult<DestinationT>> writtenRecordsTag = new TupleTag<>("writtenRecords");
TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag =
new TupleTag<>("unwrittenRecords");
PCollectionTuple writeTuple =
input.apply(
"WriteUnshardedBundles",
ParDo.of(new WriteUnshardedTempFilesFn(unwrittenRecordsTag, destinationCoder))
.withSideInputs(getSideInputs())
.withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittenRecordsTag)));
PCollection<FileResult<DestinationT>> writtenBundleFiles =
writeTuple.get(writtenRecordsTag).setCoder(fileResultCoder);
// Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in
// finalize to stay consistent with what WriteWindowedBundles does.
PCollection<FileResult<DestinationT>> writtenSpilledFiles =
writeTuple
.get(unwrittenRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
// Here we group by a synthetic shard number in the range [0, spill factor),
// just for the sake of getting some parallelism within each destination when
// writing the spilled records, whereas the non-spilled records don't have a shard
// number assigned at all. Drop the shard number on the spilled records so that
// shard numbers are assigned together to both the spilled and non-spilled files in
// finalize.
.apply("GroupUnwritten", GroupByKey.create())
.apply(
"WriteUnwritten",
ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder)
.apply(
"DropShardNum",
ParDo.of(
new DoFn<FileResult<DestinationT>, FileResult<DestinationT>>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(c.element().withShard(UNKNOWN_SHARDNUM));
}
}));
return PCollectionList.of(writtenBundleFiles)
.and(writtenSpilledFiles)
.apply(Flatten.pCollections())
.setCoder(fileResultCoder);
}
}
/**
* Writes all the elements in a bundle using a {@link Writer} produced by the {@link
* WriteOperation} associated with the {@link FileBasedSink}.
*/
private class WriteUnshardedTempFilesFn extends DoFn<UserT, FileResult<DestinationT>> {
@Nullable private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag;
private final Coder<DestinationT> destinationCoder;
// Initialized in startBundle()
private @Nullable Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers;
private int spilledShardNum = UNKNOWN_SHARDNUM;
WriteUnshardedTempFilesFn(
@Nullable TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag,
Coder<DestinationT> destinationCoder) {
this.unwrittenRecordsTag = unwrittenRecordsTag;
this.destinationCoder = destinationCoder;
}
@StartBundle
public void startBundle(StartBundleContext c) {
// Reset state in case of reuse. We need to make sure that each bundle gets unique writers.
writers = Maps.newHashMap();
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
PaneInfo paneInfo = c.pane();
// If we are doing windowed writes, we need to ensure that we have separate files for
// data in different windows/panes. Similar for dynamic writes, make sure that different
// destinations go to different writers.
// In the case of unwindowed writes, the window and the pane will always be the same, and
// the map will only have a single element.
DestinationT destination = getDynamicDestinations().getDestination(c.element());
WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination);
Writer<DestinationT, OutputT> writer = writers.get(key);
if (writer == null) {
if (getMaxNumWritersPerBundle() < 0 || writers.size() <= getMaxNumWritersPerBundle()) {
String uuid = UUID.randomUUID().toString();
LOG.info(
"Opening writer {} for window {} pane {} destination {}",
uuid,
window,
paneInfo,
destination);
writer = writeOperation.createWriter();
writer.setDestination(destination);
writer.open(uuid);
writers.put(key, writer);
LOG.debug("Done opening writer");
} else {
if (spilledShardNum == UNKNOWN_SHARDNUM) {
// Cache the random value so we only call ThreadLocalRandom once per DoFn instance.
spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR);
} else {
spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR;
}
c.output(
unwrittenRecordsTag,
KV.of(
ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum),
c.element()));
return;
}
}
writeOrClose(writer, getDynamicDestinations().formatRecord(c.element()));
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry :
writers.entrySet()) {
WriterKey<DestinationT> key = entry.getKey();
Writer<DestinationT, OutputT> writer = entry.getValue();
try {
writer.close();
} catch (Exception e) {
// If anything goes wrong, make sure to delete the temporary file.
writer.cleanup();
throw e;
}
BoundedWindow window = key.window;
c.output(
new FileResult<>(
writer.getOutputFile(), UNKNOWN_SHARDNUM, window, key.paneInfo, key.destination),
window.maxTimestamp(),
window);
}
}
}
private static <DestinationT, OutputT> void writeOrClose(
Writer<DestinationT, OutputT> writer, OutputT t) throws Exception {
try {
writer.write(t);
} catch (Exception e) {
try {
writer.close();
// If anything goes wrong, make sure to delete the temporary file.
writer.cleanup();
} catch (Exception closeException) {
if (closeException instanceof InterruptedException) {
// Do not silently ignore interrupted state.
Thread.currentThread().interrupt();
}
// Do not mask the exception that caused the write to fail.
e.addSuppressed(closeException);
}
throw e;
}
}
private static class WriterKey<DestinationT> {
private final BoundedWindow window;
private final PaneInfo paneInfo;
private final DestinationT destination;
WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) {
this.window = window;
this.paneInfo = paneInfo;
this.destination = destination;
}
@Override
public boolean equals(Object o) {
if (!(o instanceof WriterKey)) {
return false;
}
WriterKey other = (WriterKey) o;
return Objects.equal(window, other.window)
&& Objects.equal(paneInfo, other.paneInfo)
&& Objects.equal(destination, other.destination);
}
@Override
public int hashCode() {
return Objects.hashCode(window, paneInfo, destination);
}
}
// Hash the destination in a manner that we can then use as a key in a GBK. Since Java's
// hashCode isn't guaranteed to be stable across machines, we instead serialize the destination
// and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so
// this can be used as a key.
private static <DestinationT> int hashDestination(
DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException {
return Hashing.murmur3_32()
.hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination))
.asInt();
}
private class WriteShardedBundlesToTempFiles
extends PTransform<PCollection<UserT>, PCollection<FileResult<DestinationT>>> {
private final Coder<DestinationT> destinationCoder;
private final Coder<FileResult<DestinationT>> fileResultCoder;
private final @Nullable PCollectionView<Integer> numShardsView;
private WriteShardedBundlesToTempFiles(
Coder<DestinationT> destinationCoder,
Coder<FileResult<DestinationT>> fileResultCoder,
@Nullable PCollectionView<Integer> numShardsView) {
this.destinationCoder = destinationCoder;
this.fileResultCoder = fileResultCoder;
this.numShardsView = numShardsView;
}
@Override
public PCollection<FileResult<DestinationT>> expand(PCollection<UserT> input) {
List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
shardingSideInputs.add(numShardsView);
}
ShardingFunction<UserT, DestinationT> shardingFunction =
getShardingFunction() == null
? new RandomShardingFunction(destinationCoder)
: getShardingFunction();
return input
.apply(
"ApplyShardingKey",
ParDo.of(new ApplyShardingFunctionFn(shardingFunction, numShardsView))
.withSideInputs(shardingSideInputs))
.setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder()))
.apply("GroupIntoShards", GroupByKey.create())
.apply(
"WriteShardsIntoTempFiles",
ParDo.of(new WriteShardsIntoTempFilesFn()).withSideInputs(getSideInputs()))
.setCoder(fileResultCoder);
}
}
private class RandomShardingFunction implements ShardingFunction<UserT, DestinationT> {
private final Coder<DestinationT> destinationCoder;
private int shardNumber;
RandomShardingFunction(Coder<DestinationT> destinationCoder) {
this.destinationCoder = destinationCoder;
this.shardNumber = UNKNOWN_SHARDNUM;
}
@Override
public ShardedKey<Integer> assignShardKey(
DestinationT destination, UserT element, int shardCount) throws Exception {
if (shardNumber == UNKNOWN_SHARDNUM) {
// We want to desynchronize the first record sharding key for each instance of
// ApplyShardingKey, so records in a small PCollection will be statistically balanced.
shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
} else {
shardNumber = (shardNumber + 1) % shardCount;
}
// We avoid using destination itself as a sharding key, because destination is often large.
// e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path
// to the file. Often most of the path is constant across all destinations, just the path
// suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully
// chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve
// the destinations. This does mean that multiple destinations might end up on the same shard,
// however the number of collisions should be small, so there's no need to worry about memory
// issues.
return ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber);
}
}
private class ApplyShardingFunctionFn extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> {
private final ShardingFunction<UserT, DestinationT> shardingFn;
private final @Nullable PCollectionView<Integer> numShardsView;
ApplyShardingFunctionFn(
ShardingFunction<UserT, DestinationT> shardingFn,
@Nullable PCollectionView<Integer> numShardsView) {
this.numShardsView = numShardsView;
this.shardingFn = shardingFn;
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(context);
final int shardCount;
if (numShardsView != null) {
shardCount = context.sideInput(numShardsView);
} else {
checkNotNull(getNumShardsProvider());
shardCount = getNumShardsProvider().get();
}
checkArgument(
shardCount > 0,
"Must have a positive number of shards specified for non-runner-determined sharding."
+ " Got %s",
shardCount);
DestinationT destination = getDynamicDestinations().getDestination(context.element());
ShardedKey<Integer> shardKey =
shardingFn.assignShardKey(destination, context.element(), shardCount);
context.output(KV.of(shardKey, context.element()));
}
}
private class WriteShardsIntoTempFilesFn
extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
// Since we key by a 32-bit hash of the destination, there might be multiple destinations
// in this iterable. The number of destinations is generally very small (1000s or less), so
// there will rarely be hash collisions.
Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap();
for (UserT input : c.element().getValue()) {
DestinationT destination = getDynamicDestinations().getDestination(input);
Writer<DestinationT, OutputT> writer = writers.get(destination);
if (writer == null) {
String uuid = UUID.randomUUID().toString();
LOG.info(
"Opening writer {} for window {} pane {} destination {}",
uuid,
window,
c.pane(),
destination);
writer = writeOperation.createWriter();
writer.setDestination(destination);
writer.open(uuid);
writers.put(destination, writer);
}
writeOrClose(writer, getDynamicDestinations().formatRecord(input));
}
// Close all writers.
for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) {
Writer<DestinationT, OutputT> writer = entry.getValue();
try {
// Close the writer; if this throws let the error propagate.
writer.close();
} catch (Exception e) {
// If anything goes wrong, make sure to delete the temporary file.
writer.cleanup();
throw e;
}
int shard = c.element().getKey().getShardNumber();
checkArgument(
shard != UNKNOWN_SHARDNUM,
"Shard should have been set, but is unset for element %s",
c.element());
c.output(new FileResult<>(writer.getOutputFile(), shard, window, c.pane(), entry.getKey()));
}
}
}
private class FinalizeTempFileBundles
extends PTransform<
PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> {
@Nullable private final PCollectionView<Integer> numShardsView;
private final Coder<DestinationT> destinationCoder;
private FinalizeTempFileBundles(
@Nullable PCollectionView<Integer> numShardsView, Coder<DestinationT> destinationCoder) {
this.numShardsView = numShardsView;
this.destinationCoder = destinationCoder;
}
@Override
public WriteFilesResult<DestinationT> expand(
PCollection<List<FileResult<DestinationT>>> input) {
List<PCollectionView<?>> finalizeSideInputs = Lists.newArrayList(getSideInputs());
if (numShardsView != null) {
finalizeSideInputs.add(numShardsView);
}
PCollection<KV<DestinationT, String>> outputFilenames =
input
.apply("Finalize", ParDo.of(new FinalizeFn()).withSideInputs(finalizeSideInputs))
.setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of()))
// Reshuffle the filenames to make sure they are observable downstream
// only after each one is done finalizing.
.apply(Reshuffle.viaRandomKey());
TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag =
new TupleTag<>("perDestinationOutputFilenames");
return WriteFilesResult.in(
input.getPipeline(), perDestinationOutputFilenamesTag, outputFilenames);
}
private class FinalizeFn
extends DoFn<List<FileResult<DestinationT>>, KV<DestinationT, String>> {
@ProcessElement
public void process(ProcessContext c) throws Exception {
getDynamicDestinations().setSideInputAccessorFromProcessContext(c);
@Nullable Integer fixedNumShards;
if (numShardsView != null) {
fixedNumShards = c.sideInput(numShardsView);
} else if (getNumShardsProvider() != null) {
fixedNumShards = getNumShardsProvider().get();
} else {
fixedNumShards = null;
}
List<FileResult<DestinationT>> fileResults = Lists.newArrayList(c.element());
LOG.info("Finalizing {} file results", fileResults.size());
DestinationT defaultDest = getDynamicDestinations().getDefaultDestination();
List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames =
fileResults.isEmpty()
? writeOperation.finalizeDestination(
defaultDest, GlobalWindow.INSTANCE, fixedNumShards, fileResults)
: finalizeAllDestinations(fileResults, fixedNumShards);
writeOperation.moveToOutputFiles(resultsToFinalFilenames);
for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) {
FileResult<DestinationT> res = entry.getKey();
c.output(KV.of(res.getDestination(), entry.getValue().toString()));
}
}
}
}
private List<KV<FileResult<DestinationT>, ResourceId>> finalizeAllDestinations(
List<FileResult<DestinationT>> fileResults, @Nullable Integer fixedNumShards)
throws Exception {
Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res =
ArrayListMultimap.create();
for (FileResult<DestinationT> result : fileResults) {
res.put(KV.of(result.getDestination(), result.getWindow()), result);
}
List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList();
for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>>
destEntry : res.asMap().entrySet()) {
KV<DestinationT, BoundedWindow> destWindow = destEntry.getKey();
resultsToFinalFilenames.addAll(
writeOperation.finalizeDestination(
destWindow.getKey(), destWindow.getValue(), fixedNumShards, destEntry.getValue()));
}
return resultsToFinalFilenames;
}
private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
@Nullable private transient Multimap<BoundedWindow, T> bundles = null;
@StartBundle
public void startBundle() {
bundles = ArrayListMultimap.create();
}
@ProcessElement
public void process(ProcessContext c, BoundedWindow w) {
bundles.put(w, c.element());
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
for (BoundedWindow w : bundles.keySet()) {
c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w);
}
}
}
}