| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.beam.sdk.io; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import com.google.common.base.Objects; |
| import com.google.common.collect.ArrayListMultimap; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.ImmutableSet; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Multimap; |
| import com.google.common.collect.Sets; |
| import com.google.common.hash.Hashing; |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.ThreadLocalRandom; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.coders.CannotProvideCoderException; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.Coder.NonDeterministicException; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.ShardedKeyCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.coders.VoidCoder; |
| import org.apache.beam.sdk.io.FileBasedSink.FileResult; |
| import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder; |
| import org.apache.beam.sdk.io.FileBasedSink.WriteOperation; |
| import org.apache.beam.sdk.io.FileBasedSink.Writer; |
| import org.apache.beam.sdk.io.fs.ResourceId; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.transforms.Create; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.Flatten; |
| import org.apache.beam.sdk.transforms.GroupByKey; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.Reshuffle; |
| import org.apache.beam.sdk.transforms.Values; |
| import org.apache.beam.sdk.transforms.View; |
| import org.apache.beam.sdk.transforms.WithKeys; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindows; |
| import org.apache.beam.sdk.transforms.windowing.PaneInfo; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.util.CoderUtils; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PCollection.IsBounded; |
| import org.apache.beam.sdk.values.PCollectionList; |
| import org.apache.beam.sdk.values.PCollectionTuple; |
| import org.apache.beam.sdk.values.PCollectionView; |
| import org.apache.beam.sdk.values.PCollectionViews; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.sdk.values.PValue; |
| import org.apache.beam.sdk.values.ShardedKey; |
| import org.apache.beam.sdk.values.TupleTag; |
| import org.apache.beam.sdk.values.TupleTagList; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A {@link PTransform} that writes to a {@link FileBasedSink}. A write begins with a sequential |
| * global initialization of a sink, followed by a parallel write, and ends with a sequential |
| * finalization of the write. The output of a write is {@link PDone}. |
| * |
| * <p>By default, every bundle in the input {@link PCollection} will be processed by a {@link |
| * WriteOperation}, so the number of output will vary based on runner behavior, though at least 1 |
| * output will always be produced. The exact parallelism of the write stage can be controlled using |
| * {@link WriteFiles#withNumShards}, typically used to control how many files are produced or to |
| * globally limit the number of workers connecting to an external service. However, this option can |
| * often hurt performance: it adds an additional {@link GroupByKey} to the pipeline. |
| * |
| * <p>Example usage with runner-determined sharding: |
| * |
| * <pre>{@code p.apply(WriteFiles.to(new MySink(...)));}</pre> |
| * |
| * <p>Example usage with a fixed number of shards: |
| * |
| * <pre>{@code p.apply(WriteFiles.to(new MySink(...)).withNumShards(3));}</pre> |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public class WriteFiles<UserT, DestinationT, OutputT> |
| extends PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> { |
| private static final Logger LOG = LoggerFactory.getLogger(WriteFiles.class); |
| |
| // The maximum number of file writers to keep open in a single bundle at a time, since file |
| // writers default to 64mb buffers. This comes into play when writing per-window files. |
| // The first 20 files from a single WriteFiles transform will write files inline in the |
| // transform. Anything beyond that might be shuffled. |
| // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on |
| // their own policy. |
| private static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20; |
| |
| // When we spill records, shard the output keys to prevent hotspots. |
| // We could consider making this a parameter. |
| private static final int SPILLED_RECORD_SHARDING_FACTOR = 10; |
| |
| static final int UNKNOWN_SHARDNUM = -1; |
| private FileBasedSink<UserT, DestinationT, OutputT> sink; |
| private WriteOperation<DestinationT, OutputT> writeOperation; |
| // This allows the number of shards to be dynamically computed based on the input |
| // PCollection. |
| @Nullable private final PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards; |
| // We don't use a side input for static sharding, as we want this value to be updatable |
| // when a pipeline is updated. |
| @Nullable |
| private final ValueProvider<Integer> numShardsProvider; |
| private final boolean windowedWrites; |
| private int maxNumWritersPerBundle; |
| // This is the set of side inputs used by this transform. This is usually populated by the users's |
| // DynamicDestinations object. |
| private final List<PCollectionView<?>> sideInputs; |
| |
| /** |
| * Creates a {@link WriteFiles} transform that writes to the given {@link FileBasedSink}, letting |
| * the runner control how many different shards are produced. |
| */ |
| public static <UserT, DestinationT, OutputT> WriteFiles<UserT, DestinationT, OutputT> to( |
| FileBasedSink<UserT, DestinationT, OutputT> sink) { |
| checkArgument(sink != null, "sink can not be null"); |
| return new WriteFiles<>( |
| sink, |
| null /* runner-determined sharding */, |
| null, |
| false, |
| DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE, |
| sink.getDynamicDestinations().getSideInputs()); |
| } |
| |
| private WriteFiles( |
| FileBasedSink<UserT, DestinationT, OutputT> sink, |
| @Nullable PTransform<PCollection<UserT>, PCollectionView<Integer>> computeNumShards, |
| @Nullable ValueProvider<Integer> numShardsProvider, |
| boolean windowedWrites, |
| int maxNumWritersPerBundle, |
| List<PCollectionView<?>> sideInputs) { |
| this.sink = sink; |
| this.computeNumShards = computeNumShards; |
| this.numShardsProvider = numShardsProvider; |
| this.windowedWrites = windowedWrites; |
| this.maxNumWritersPerBundle = maxNumWritersPerBundle; |
| this.sideInputs = sideInputs; |
| } |
| |
| @Override |
| public Map<TupleTag<?>, PValue> getAdditionalInputs() { |
| return PCollectionViews.toAdditionalInputs(sideInputs); |
| } |
| |
| @Override |
| public WriteFilesResult<DestinationT> expand(PCollection<UserT> input) { |
| if (input.isBounded() == IsBounded.UNBOUNDED) { |
| checkArgument(windowedWrites, |
| "Must use windowed writes when applying %s to an unbounded PCollection", |
| WriteFiles.class.getSimpleName()); |
| } |
| if (windowedWrites) { |
| // The reason for this is https://issues.apache.org/jira/browse/BEAM-1438 |
| // and similar behavior in other runners. |
| checkArgument( |
| computeNumShards != null || numShardsProvider != null, |
| "When using windowed writes, must specify number of output shards explicitly", |
| WriteFiles.class.getSimpleName()); |
| } |
| this.writeOperation = sink.createWriteOperation(); |
| this.writeOperation.setWindowedWrites(windowedWrites); |
| return createWrite(input); |
| } |
| |
| @Override |
| public void validate(PipelineOptions options) { |
| sink.validate(options); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .add(DisplayData.item("sink", sink.getClass()).withLabel("WriteFiles Sink")) |
| .include("sink", sink); |
| if (getSharding() != null) { |
| builder.include("sharding", getSharding()); |
| } else { |
| builder.addIfNotNull(DisplayData.item("numShards", getNumShards()) |
| .withLabel("Fixed Number of Shards")); |
| } |
| } |
| |
| /** Returns the {@link FileBasedSink} associated with this PTransform. */ |
| public FileBasedSink<UserT, DestinationT, OutputT> getSink() { |
| return sink; |
| } |
| |
| /** |
| * Returns whether or not to perform windowed writes. |
| */ |
| public boolean isWindowedWrites() { |
| return windowedWrites; |
| } |
| |
| /** |
| * Gets the {@link PTransform} that will be used to determine sharding. This can be either a |
| * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by |
| * {@link #withSharding(PTransform)}), or runner-determined (by {@link |
| * #withRunnerDeterminedSharding()}. |
| */ |
| @Nullable |
| public PTransform<PCollection<UserT>, PCollectionView<Integer>> getSharding() { |
| return computeNumShards; |
| } |
| |
| public ValueProvider<Integer> getNumShards() { |
| return numShardsProvider; |
| } |
| |
| /** |
| * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the |
| * specified number of shards. |
| * |
| * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for |
| * more information. |
| * |
| * <p>A value less than or equal to 0 will be equivalent to the default behavior of |
| * runner-determined sharding. |
| */ |
| public WriteFiles<UserT, DestinationT, OutputT> withNumShards(int numShards) { |
| if (numShards > 0) { |
| return withNumShards(StaticValueProvider.of(numShards)); |
| } |
| return withRunnerDeterminedSharding(); |
| } |
| |
| /** |
| * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the |
| * {@link ValueProvider} specified number of shards. |
| * |
| * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for |
| * more information. |
| */ |
| public WriteFiles<UserT, DestinationT, OutputT> withNumShards( |
| ValueProvider<Integer> numShardsProvider) { |
| return new WriteFiles<>( |
| sink, |
| computeNumShards, |
| numShardsProvider, |
| windowedWrites, |
| maxNumWritersPerBundle, |
| sideInputs); |
| } |
| |
| /** Set the maximum number of writers created in a bundle before spilling to shuffle. */ |
| public WriteFiles<UserT, DestinationT, OutputT> withMaxNumWritersPerBundle( |
| int maxNumWritersPerBundle) { |
| return new WriteFiles<>( |
| sink, |
| computeNumShards, |
| numShardsProvider, |
| windowedWrites, |
| maxNumWritersPerBundle, |
| sideInputs); |
| } |
| |
| public WriteFiles<UserT, DestinationT, OutputT> withSideInputs( |
| List<PCollectionView<?>> sideInputs) { |
| return new WriteFiles<>( |
| sink, |
| computeNumShards, |
| numShardsProvider, |
| windowedWrites, |
| maxNumWritersPerBundle, |
| sideInputs); |
| } |
| |
| /** |
| * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} using the |
| * specified {@link PTransform} to compute the number of shards. |
| * |
| * <p>This option should be used sparingly as it can hurt performance. See {@link WriteFiles} for |
| * more information. |
| */ |
| public WriteFiles<UserT, DestinationT, OutputT> withSharding( |
| PTransform<PCollection<UserT>, PCollectionView<Integer>> sharding) { |
| checkArgument( |
| sharding != null, "sharding can not be null. Use withRunnerDeterminedSharding() instead."); |
| return new WriteFiles<>( |
| sink, sharding, null, windowedWrites, maxNumWritersPerBundle, sideInputs); |
| } |
| |
| /** |
| * Returns a new {@link WriteFiles} that will write to the current {@link FileBasedSink} with |
| * runner-determined sharding. |
| */ |
| public WriteFiles<UserT, DestinationT, OutputT> withRunnerDeterminedSharding() { |
| return new WriteFiles<>(sink, null, null, windowedWrites, maxNumWritersPerBundle, sideInputs); |
| } |
| |
| /** |
| * Returns a new {@link WriteFiles} that writes preserves windowing on it's input. |
| * |
| * <p>If this option is not specified, windowing and triggering are replaced by {@link |
| * GlobalWindows} and {@link DefaultTrigger}. |
| * |
| * <p>If there is no data for a window, no output shards will be generated for that window. If a |
| * window triggers multiple times, then more than a single output shard might be generated |
| * multiple times; it's up to the sink implementation to keep these output shards unique. |
| * |
| * <p>This option can only be used if {@link #withNumShards(int)} is also set to a positive value. |
| */ |
| public WriteFiles<UserT, DestinationT, OutputT> withWindowedWrites() { |
| return new WriteFiles<>( |
| sink, computeNumShards, numShardsProvider, true, maxNumWritersPerBundle, sideInputs); |
| } |
| |
| private static class WriterKey<DestinationT> { |
| private final BoundedWindow window; |
| private final PaneInfo paneInfo; |
| private final DestinationT destination; |
| |
| WriterKey(BoundedWindow window, PaneInfo paneInfo, DestinationT destination) { |
| this.window = window; |
| this.paneInfo = paneInfo; |
| this.destination = destination; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (!(o instanceof WriterKey)) { |
| return false; |
| } |
| WriterKey other = (WriterKey) o; |
| return Objects.equal(window, other.window) |
| && Objects.equal(paneInfo, other.paneInfo) |
| && Objects.equal(destination, other.destination); |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hashCode(window, paneInfo, destination); |
| } |
| } |
| |
| // Hash the destination in a manner that we can then use as a key in a GBK. Since Java's |
| // hashCode isn't guaranteed to be stable across machines, we instead serialize the destination |
| // and use murmur3_32 to hash it. We enforce that destinationCoder must be deterministic, so |
| // this can be used as a key. |
| private static <DestinationT> int hashDestination( |
| DestinationT destination, Coder<DestinationT> destinationCoder) throws IOException { |
| return Hashing.murmur3_32() |
| .hashBytes(CoderUtils.encodeToByteArray(destinationCoder, destination)) |
| .asInt(); |
| } |
| |
| /** |
| * Writes all the elements in a bundle using a {@link Writer} produced by the {@link |
| * WriteOperation} associated with the {@link FileBasedSink}. |
| */ |
| private class WriteBundles extends DoFn<UserT, FileResult<DestinationT>> { |
| private final TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag; |
| private final Coder<DestinationT> destinationCoder; |
| private final boolean windowedWrites; |
| |
| private Map<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> writers; |
| private int spilledShardNum = UNKNOWN_SHARDNUM; |
| |
| WriteBundles( |
| boolean windowedWrites, |
| TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittenRecordsTag, |
| Coder<DestinationT> destinationCoder) { |
| this.windowedWrites = windowedWrites; |
| this.unwrittenRecordsTag = unwrittenRecordsTag; |
| this.destinationCoder = destinationCoder; |
| } |
| |
| @StartBundle |
| public void startBundle(StartBundleContext c) { |
| // Reset state in case of reuse. We need to make sure that each bundle gets unique writers. |
| writers = Maps.newHashMap(); |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, BoundedWindow window) throws Exception { |
| sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); |
| PaneInfo paneInfo = c.pane(); |
| // If we are doing windowed writes, we need to ensure that we have separate files for |
| // data in different windows/panes. Similar for dynamic writes, make sure that different |
| // destinations go to different writers. |
| // In the case of unwindowed writes, the window and the pane will always be the same, and |
| // the map will only have a single element. |
| DestinationT destination = sink.getDynamicDestinations().getDestination(c.element()); |
| WriterKey<DestinationT> key = new WriterKey<>(window, c.pane(), destination); |
| Writer<DestinationT, OutputT> writer = writers.get(key); |
| if (writer == null) { |
| if (writers.size() <= maxNumWritersPerBundle) { |
| String uuid = UUID.randomUUID().toString(); |
| LOG.info( |
| "Opening writer {} for write operation {}, window {} pane {} destination {}", |
| uuid, |
| writeOperation, |
| window, |
| paneInfo, |
| destination); |
| writer = writeOperation.createWriter(); |
| if (windowedWrites) { |
| writer.openWindowed(uuid, window, paneInfo, UNKNOWN_SHARDNUM, destination); |
| } else { |
| writer.openUnwindowed(uuid, UNKNOWN_SHARDNUM, destination); |
| } |
| writers.put(key, writer); |
| LOG.debug("Done opening writer"); |
| } else { |
| if (spilledShardNum == UNKNOWN_SHARDNUM) { |
| // Cache the random value so we only call ThreadLocalRandom once per DoFn instance. |
| spilledShardNum = ThreadLocalRandom.current().nextInt(SPILLED_RECORD_SHARDING_FACTOR); |
| } else { |
| spilledShardNum = (spilledShardNum + 1) % SPILLED_RECORD_SHARDING_FACTOR; |
| } |
| c.output( |
| unwrittenRecordsTag, |
| KV.of( |
| ShardedKey.of(hashDestination(destination, destinationCoder), spilledShardNum), |
| c.element())); |
| return; |
| } |
| } |
| writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(c.element())); |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) throws Exception { |
| for (Map.Entry<WriterKey<DestinationT>, Writer<DestinationT, OutputT>> entry : |
| writers.entrySet()) { |
| Writer<DestinationT, OutputT> writer = entry.getValue(); |
| FileResult<DestinationT> result; |
| try { |
| result = writer.close(); |
| } catch (Exception e) { |
| // If anything goes wrong, make sure to delete the temporary file. |
| writer.cleanup(); |
| throw e; |
| } |
| BoundedWindow window = entry.getKey().window; |
| c.output(result, window.maxTimestamp(), window); |
| } |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| builder.delegate(WriteFiles.this); |
| } |
| } |
| |
| enum ShardAssignment { ASSIGN_IN_FINALIZE, ASSIGN_WHEN_WRITING } |
| |
| /* |
| * Like {@link WriteBundles}, but where the elements for each shard have been collected into a |
| * single iterable. |
| */ |
| private class WriteShardedBundles |
| extends DoFn<KV<ShardedKey<Integer>, Iterable<UserT>>, FileResult<DestinationT>> { |
| ShardAssignment shardNumberAssignment; |
| WriteShardedBundles(ShardAssignment shardNumberAssignment) { |
| this.shardNumberAssignment = shardNumberAssignment; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c, BoundedWindow window) throws Exception { |
| sink.getDynamicDestinations().setSideInputAccessorFromProcessContext(c); |
| // Since we key by a 32-bit hash of the destination, there might be multiple destinations |
| // in this iterable. The number of destinations is generally very small (1000s or less), so |
| // there will rarely be hash collisions. |
| Map<DestinationT, Writer<DestinationT, OutputT>> writers = Maps.newHashMap(); |
| for (UserT input : c.element().getValue()) { |
| DestinationT destination = sink.getDynamicDestinations().getDestination(input); |
| Writer<DestinationT, OutputT> writer = writers.get(destination); |
| if (writer == null) { |
| LOG.debug("Opening writer for write operation {}", writeOperation); |
| writer = writeOperation.createWriter(); |
| int shardNumber = |
| shardNumberAssignment == ShardAssignment.ASSIGN_WHEN_WRITING |
| ? c.element().getKey().getShardNumber() |
| : UNKNOWN_SHARDNUM; |
| if (windowedWrites) { |
| writer.openWindowed( |
| UUID.randomUUID().toString(), window, c.pane(), shardNumber, destination); |
| } else { |
| writer.openUnwindowed( |
| UUID.randomUUID().toString(), shardNumber, destination); |
| } |
| LOG.debug("Done opening writer"); |
| writers.put(destination, writer); |
| } |
| writeOrClose(writer, getSink().getDynamicDestinations().formatRecord(input)); |
| } |
| |
| // Close all writers. |
| for (Map.Entry<DestinationT, Writer<DestinationT, OutputT>> entry : writers.entrySet()) { |
| Writer<DestinationT, OutputT> writer = entry.getValue(); |
| FileResult<DestinationT> result; |
| try { |
| // Close the writer; if this throws let the error propagate. |
| result = writer.close(); |
| c.output(result); |
| } catch (Exception e) { |
| // If anything goes wrong, make sure to delete the temporary file. |
| writer.cleanup(); |
| throw e; |
| } |
| } |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| builder.delegate(WriteFiles.this); |
| } |
| } |
| |
| private static <DestinationT, OutputT> void writeOrClose( |
| Writer<DestinationT, OutputT> writer, OutputT t) throws Exception { |
| try { |
| writer.write(t); |
| } catch (Exception e) { |
| try { |
| writer.close(); |
| // If anything goes wrong, make sure to delete the temporary file. |
| writer.cleanup(); |
| } catch (Exception closeException) { |
| if (closeException instanceof InterruptedException) { |
| // Do not silently ignore interrupted state. |
| Thread.currentThread().interrupt(); |
| } |
| // Do not mask the exception that caused the write to fail. |
| e.addSuppressed(closeException); |
| } |
| throw e; |
| } |
| } |
| |
| private class ApplyShardingKey extends DoFn<UserT, KV<ShardedKey<Integer>, UserT>> { |
| private final PCollectionView<Integer> numShardsView; |
| private final ValueProvider<Integer> numShardsProvider; |
| private final Coder<DestinationT> destinationCoder; |
| |
| private int shardNumber; |
| |
| ApplyShardingKey( |
| PCollectionView<Integer> numShardsView, |
| ValueProvider<Integer> numShardsProvider, |
| Coder<DestinationT> destinationCoder) { |
| this.destinationCoder = destinationCoder; |
| this.numShardsView = numShardsView; |
| this.numShardsProvider = numShardsProvider; |
| shardNumber = UNKNOWN_SHARDNUM; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext context) throws IOException { |
| final int shardCount; |
| if (numShardsView != null) { |
| shardCount = context.sideInput(numShardsView); |
| } else { |
| checkNotNull(numShardsProvider); |
| shardCount = numShardsProvider.get(); |
| } |
| checkArgument( |
| shardCount > 0, |
| "Must have a positive number of shards specified for non-runner-determined sharding." |
| + " Got %s", |
| shardCount); |
| if (shardNumber == UNKNOWN_SHARDNUM) { |
| // We want to desynchronize the first record sharding key for each instance of |
| // ApplyShardingKey, so records in a small PCollection will be statistically balanced. |
| shardNumber = ThreadLocalRandom.current().nextInt(shardCount); |
| } else { |
| shardNumber = (shardNumber + 1) % shardCount; |
| } |
| // We avoid using destination itself as a sharding key, because destination is often large. |
| // e.g. when using {@link DefaultFilenamePolicy}, the destination contains the entire path |
| // to the file. Often most of the path is constant across all destinations, just the path |
| // suffix is appended by the destination function. Instead we key by a 32-bit hash (carefully |
| // chosen to be guaranteed stable), and call getDestination again in the next ParDo to resolve |
| // the destinations. This does mean that multiple destinations might end up on the same shard, |
| // however the number of collisions should be small, so there's no need to worry about memory |
| // issues. |
| DestinationT destination = sink.getDynamicDestinations().getDestination(context.element()); |
| context.output( |
| KV.of( |
| ShardedKey.of(hashDestination(destination, destinationCoder), shardNumber), |
| context.element())); |
| } |
| } |
| |
| private static <DestinationT> |
| Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> |
| groupByDestinationAndWindow(Iterable<FileResult<DestinationT>> results) { |
| Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> res = |
| ArrayListMultimap.create(); |
| for (FileResult<DestinationT> result : results) { |
| res.put(KV.of(result.getDestination(), result.getWindow()), result); |
| } |
| return res; |
| } |
| |
| /** |
| * A write is performed as sequence of three {@link ParDo}'s. |
| * |
| * <p>This singleton collection containing the WriteOperation is then used as a side input to a |
| * ParDo over the PCollection of elements to write. In this bundle-writing phase, {@link |
| * WriteOperation#createWriter} is called to obtain a {@link Writer}. {@link Writer#open} and |
| * {@link Writer#close} are called in {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, |
| * respectively, and {@link Writer#write} method is called for every element in the bundle. The |
| * output of this ParDo is a PCollection of <i>writer result</i> objects (see {@link |
| * FileBasedSink} for a description of writer results)-one for each bundle. |
| * |
| * <p>The final do-once ParDo uses a singleton collection asinput and the collection of writer |
| * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called to finalize |
| * the write. |
| * |
| * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called |
| * before the exception that caused the write to fail is propagated and the write result will be |
| * discarded. |
| * |
| * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and |
| * deserialized in the bundle-writing and finalization phases, any state change to the |
| * WriteOperation object that occurs during initialization is visible in the latter phases. |
| * However, the WriteOperation is not serialized after the bundle-writing phase. This is why |
| * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate |
| * WriteOperation). |
| */ |
| private WriteFilesResult<DestinationT> createWrite(PCollection<UserT> input) { |
| Pipeline p = input.getPipeline(); |
| |
| if (!windowedWrites) { |
| // Re-window the data into the global window and remove any existing triggers. |
| input = |
| input.apply( |
| Window.<UserT>into(new GlobalWindows()) |
| .triggering(DefaultTrigger.of()) |
| .discardingFiredPanes()); |
| } |
| |
| // Perform the per-bundle writes as a ParDo on the input PCollection (with the |
| // WriteOperation as a side input) and collect the results of the writes in a |
| // PCollection. There is a dependency between this ParDo and the first (the |
| // WriteOperation PCollection as a side input), so this will happen after the |
| // initial ParDo. |
| PCollection<FileResult<DestinationT>> results; |
| final PCollectionView<Integer> numShardsView; |
| @SuppressWarnings("unchecked") |
| Coder<BoundedWindow> shardedWindowCoder = |
| (Coder<BoundedWindow>) input.getWindowingStrategy().getWindowFn().windowCoder(); |
| final Coder<DestinationT> destinationCoder; |
| try { |
| destinationCoder = |
| sink.getDynamicDestinations() |
| .getDestinationCoderWithDefault(input.getPipeline().getCoderRegistry()); |
| destinationCoder.verifyDeterministic(); |
| } catch (CannotProvideCoderException | NonDeterministicException e) { |
| throw new RuntimeException(e); |
| } |
| |
| if (computeNumShards == null && numShardsProvider == null) { |
| numShardsView = null; |
| TupleTag<FileResult<DestinationT>> writtenRecordsTag = |
| new TupleTag<>("writtenRecordsTag"); |
| TupleTag<KV<ShardedKey<Integer>, UserT>> unwrittedRecordsTag = |
| new TupleTag<>("unwrittenRecordsTag"); |
| String writeName = windowedWrites ? "WriteWindowedBundles" : "WriteBundles"; |
| PCollectionTuple writeTuple = |
| input.apply( |
| writeName, |
| ParDo.of(new WriteBundles(windowedWrites, unwrittedRecordsTag, destinationCoder)) |
| .withSideInputs(sideInputs) |
| .withOutputTags(writtenRecordsTag, TupleTagList.of(unwrittedRecordsTag))); |
| PCollection<FileResult<DestinationT>> writtenBundleFiles = |
| writeTuple |
| .get(writtenRecordsTag) |
| .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); |
| // Any "spilled" elements are written using WriteShardedBundles. Assign shard numbers in |
| // finalize to stay consistent with what WriteWindowedBundles does. |
| PCollection<FileResult<DestinationT>> writtenGroupedFiles = |
| writeTuple |
| .get(unwrittedRecordsTag) |
| .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) |
| .apply("GroupUnwritten", GroupByKey.<ShardedKey<Integer>, UserT>create()) |
| .apply( |
| "WriteUnwritten", |
| ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_IN_FINALIZE)) |
| .withSideInputs(sideInputs)) |
| .setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); |
| results = |
| PCollectionList.of(writtenBundleFiles) |
| .and(writtenGroupedFiles) |
| .apply(Flatten.<FileResult<DestinationT>>pCollections()); |
| } else { |
| List<PCollectionView<?>> shardingSideInputs = Lists.newArrayList(); |
| if (computeNumShards != null) { |
| numShardsView = input.apply(computeNumShards); |
| shardingSideInputs.add(numShardsView); |
| } else { |
| numShardsView = null; |
| } |
| PCollection<KV<ShardedKey<Integer>, Iterable<UserT>>> sharded = |
| input |
| .apply( |
| "ApplyShardLabel", |
| ParDo.of( |
| new ApplyShardingKey( |
| numShardsView, |
| (numShardsView != null) ? null : numShardsProvider, |
| destinationCoder)) |
| .withSideInputs(shardingSideInputs)) |
| .setCoder(KvCoder.of(ShardedKeyCoder.of(VarIntCoder.of()), input.getCoder())) |
| .apply("GroupIntoShards", GroupByKey.<ShardedKey<Integer>, UserT>create()); |
| shardedWindowCoder = |
| (Coder<BoundedWindow>) sharded.getWindowingStrategy().getWindowFn().windowCoder(); |
| // Since this path might be used by streaming runners processing triggers, it's important |
| // to assign shard numbers here so that they are deterministic. The ASSIGN_IN_FINALIZE |
| // strategy works by sorting all FileResult objects and assigning them numbers, which is not |
| // guaranteed to work well when processing triggers - if the finalize step retries it might |
| // see a different Iterable of FileResult objects, and it will assign different shard numbers. |
| results = |
| sharded.apply( |
| "WriteShardedBundles", |
| ParDo.of(new WriteShardedBundles(ShardAssignment.ASSIGN_WHEN_WRITING)) |
| .withSideInputs(sideInputs)); |
| } |
| results.setCoder(FileResultCoder.of(shardedWindowCoder, destinationCoder)); |
| |
| PCollection<KV<DestinationT, String>> outputFilenames; |
| if (windowedWrites) { |
| // We need to materialize the FileResult's before the renaming stage: this can be done either |
| // via a side input or via a GBK. However, when processing streaming windowed writes, results |
| // will arrive multiple times. This means we can't share the below implementation that turns |
| // the results into a side input, as new data arriving into a side input does not trigger the |
| // listening DoFn. We also can't use a GBK because we need only the materialization, but not |
| // the (potentially lossy, if the user's trigger is lossy) continuation triggering that GBK |
| // would give. So, we use a reshuffle (over a single key to maximize bundling). |
| outputFilenames = |
| results |
| .apply(WithKeys.<Void, FileResult<DestinationT>>of((Void) null)) |
| .setCoder(KvCoder.of(VoidCoder.of(), results.getCoder())) |
| .apply("Reshuffle", Reshuffle.<Void, FileResult<DestinationT>>of()) |
| .apply(Values.<FileResult<DestinationT>>create()) |
| .apply( |
| "FinalizeWindowed", |
| ParDo.of( |
| new FinalizeWindowedFn<DestinationT>( |
| numShardsView, numShardsProvider, writeOperation)) |
| .withSideInputs( |
| numShardsView == null |
| ? ImmutableList.<PCollectionView<?>>of() |
| : ImmutableList.of(numShardsView))) |
| .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); |
| } else { |
| final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView = |
| results.apply(View.<FileResult<DestinationT>>asIterable()); |
| ImmutableList.Builder<PCollectionView<?>> finalizeSideInputs = |
| ImmutableList.<PCollectionView<?>>builder().add(resultsView); |
| if (numShardsView != null) { |
| finalizeSideInputs.add(numShardsView); |
| } |
| finalizeSideInputs.addAll(sideInputs); |
| |
| // Finalize the write in another do-once ParDo on the singleton collection containing the |
| // Writer. The results from the per-bundle writes are given as an Iterable side input. |
| // The WriteOperation's state is the same as after its initialization in the first |
| // do-once ParDo. There is a dependency between this ParDo and the parallel write (the writer |
| // results collection as a side input), so it will happen after the parallel write. |
| // For the non-windowed case, we guarantee that if no data is written but the user has |
| // set numShards, then all shards will be written out as empty files. For this reason we |
| // use a side input here. |
| PCollection<Void> singletonCollection = p.apply(Create.of((Void) null)); |
| outputFilenames = |
| singletonCollection |
| .apply( |
| "FinalizeUnwindowed", |
| ParDo.of( |
| new FinalizeUnwindowedFn<>( |
| numShardsView, numShardsProvider, resultsView, writeOperation)) |
| .withSideInputs(finalizeSideInputs.build())) |
| .setCoder(KvCoder.of(destinationCoder, StringUtf8Coder.of())); |
| } |
| |
| TupleTag<KV<DestinationT, String>> perDestinationOutputFilenamesTag = |
| new TupleTag<>("perDestinationOutputFilenames"); |
| return WriteFilesResult.in( |
| input.getPipeline(), |
| perDestinationOutputFilenamesTag, |
| outputFilenames); |
| } |
| |
| private static class FinalizeWindowedFn<DestinationT> |
| extends DoFn<FileResult<DestinationT>, KV<DestinationT, String>> { |
| @Nullable private final PCollectionView<Integer> numShardsView; |
| @Nullable private final ValueProvider<Integer> numShardsProvider; |
| private final WriteOperation<DestinationT, ?> writeOperation; |
| |
| @Nullable private transient List<FileResult<DestinationT>> fileResults; |
| @Nullable private Integer fixedNumShards; |
| |
| public FinalizeWindowedFn( |
| @Nullable PCollectionView<Integer> numShardsView, |
| @Nullable ValueProvider<Integer> numShardsProvider, |
| WriteOperation<DestinationT, ?> writeOperation) { |
| this.numShardsView = numShardsView; |
| this.numShardsProvider = numShardsProvider; |
| this.writeOperation = writeOperation; |
| } |
| |
| @StartBundle |
| public void startBundle() { |
| fileResults = Lists.newArrayList(); |
| fixedNumShards = null; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) { |
| fileResults.add(c.element()); |
| if (fixedNumShards == null) { |
| if (numShardsView != null) { |
| fixedNumShards = c.sideInput(numShardsView); |
| } else if (numShardsProvider != null) { |
| fixedNumShards = numShardsProvider.get(); |
| } else { |
| fixedNumShards = null; |
| } |
| } |
| } |
| |
| @FinishBundle |
| public void finishBundle(FinishBundleContext c) throws Exception { |
| Set<ResourceId> tempFiles = Sets.newHashSet(); |
| Multimap<KV<DestinationT, BoundedWindow>, FileResult<DestinationT>> results = |
| groupByDestinationAndWindow(fileResults); |
| List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); |
| for (Map.Entry<KV<DestinationT, BoundedWindow>, Collection<FileResult<DestinationT>>> |
| destEntry : results.asMap().entrySet()) { |
| DestinationT destination = destEntry.getKey().getKey(); |
| BoundedWindow window = destEntry.getKey().getValue(); |
| resultsToFinalFilenames.addAll(writeOperation.buildOutputFilenames( |
| destination, window, fixedNumShards, destEntry.getValue())); |
| } |
| LOG.info("Will finalize {} files", resultsToFinalFilenames.size()); |
| for (KV<FileResult<DestinationT>, ResourceId> entry : resultsToFinalFilenames) { |
| FileResult<DestinationT> res = entry.getKey(); |
| tempFiles.add(res.getTempFilename()); |
| c.output( |
| KV.of(res.getDestination(), entry.getValue().toString()), |
| res.getWindow().maxTimestamp(), |
| res.getWindow()); |
| } |
| writeOperation.copyToOutputFiles(resultsToFinalFilenames); |
| writeOperation.removeTemporaryFiles(tempFiles); |
| } |
| } |
| |
| private static class FinalizeUnwindowedFn<DestinationT> |
| extends DoFn<Void, KV<DestinationT, String>> { |
| @Nullable private final PCollectionView<Integer> numShardsView; |
| @Nullable private final ValueProvider<Integer> numShardsProvider; |
| private final PCollectionView<Iterable<FileResult<DestinationT>>> resultsView; |
| private final WriteOperation<DestinationT, ?> writeOperation; |
| |
| public FinalizeUnwindowedFn( |
| @Nullable PCollectionView<Integer> numShardsView, |
| @Nullable ValueProvider<Integer> numShardsProvider, |
| PCollectionView<Iterable<FileResult<DestinationT>>> resultsView, |
| WriteOperation<DestinationT, ?> writeOperation) { |
| this.numShardsView = numShardsView; |
| this.numShardsProvider = numShardsProvider; |
| this.resultsView = resultsView; |
| this.writeOperation = writeOperation; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) throws Exception { |
| writeOperation.getSink().getDynamicDestinations().setSideInputAccessorFromProcessContext(c); |
| @Nullable Integer fixedNumShards; |
| if (numShardsView != null) { |
| fixedNumShards = c.sideInput(numShardsView); |
| } else if (numShardsProvider != null) { |
| fixedNumShards = numShardsProvider.get(); |
| } else { |
| fixedNumShards = null; |
| } |
| Multimap<DestinationT, FileResult<DestinationT>> resultsByDestMultimap = |
| ArrayListMultimap.create(); |
| for (FileResult<DestinationT> result : c.sideInput(resultsView)) { |
| resultsByDestMultimap.put(result.getDestination(), result); |
| } |
| Map<DestinationT, Collection<FileResult<DestinationT>>> resultsByDest = |
| resultsByDestMultimap.asMap(); |
| if (resultsByDest.isEmpty()) { |
| Collection<FileResult<DestinationT>> empty = ImmutableList.of(); |
| resultsByDest = |
| Collections.singletonMap( |
| writeOperation.getSink().getDynamicDestinations().getDefaultDestination(), empty); |
| } |
| List<KV<FileResult<DestinationT>, ResourceId>> resultsToFinalFilenames = Lists.newArrayList(); |
| for (Map.Entry<DestinationT, Collection<FileResult<DestinationT>>> |
| destEntry : resultsByDest.entrySet()) { |
| resultsToFinalFilenames.addAll( |
| finalizeForDestinationFillEmptyShards( |
| destEntry.getKey(), fixedNumShards, destEntry.getValue())); |
| } |
| Set<ResourceId> tempFiles = Sets.newHashSet(); |
| for (KV<FileResult<DestinationT>, ResourceId> entry : |
| resultsToFinalFilenames) { |
| tempFiles.add(entry.getKey().getTempFilename()); |
| c.output(KV.of(entry.getKey().getDestination(), entry.getValue().toString())); |
| } |
| writeOperation.copyToOutputFiles(resultsToFinalFilenames); |
| writeOperation.removeTemporaryFiles(tempFiles); |
| } |
| |
| /** |
| * Finalize a list of files for a single destination. If a minimum number of shards is needed, |
| * this function will generate empty files for this destination to ensure that all shards are |
| * generated. |
| */ |
| private List<KV<FileResult<DestinationT>, ResourceId>> finalizeForDestinationFillEmptyShards( |
| DestinationT destination, |
| @Nullable Integer fixedNumShards, |
| Collection<FileResult<DestinationT>> existingResults) |
| throws Exception { |
| checkState(!writeOperation.windowedWrites); |
| |
| LOG.info( |
| "Finalizing write operation {} for destination {} num shards {}.", |
| writeOperation, |
| destination, |
| existingResults.size()); |
| if (fixedNumShards != null) { |
| checkArgument( |
| existingResults.size() <= fixedNumShards, |
| "Fixed sharding into %s shards was specified, but got %s file results", |
| fixedNumShards, |
| existingResults.size()); |
| } |
| // We must always output at least 1 shard, and honor user-specified numShards |
| // if set. |
| Set<Integer> missingShardNums; |
| if (fixedNumShards == null) { |
| missingShardNums = ImmutableSet.of(UNKNOWN_SHARDNUM); |
| } else { |
| missingShardNums = Sets.newHashSet(); |
| for (int i = 0; i < fixedNumShards; ++i) { |
| missingShardNums.add(i); |
| } |
| for (FileResult<DestinationT> res : existingResults) { |
| checkArgument( |
| res.getShard() != UNKNOWN_SHARDNUM, |
| "Fixed sharding into %s shards was specified, " |
| + "but file result %s does not specify a shard", |
| fixedNumShards, |
| res); |
| missingShardNums.remove(res.getShard()); |
| } |
| } |
| List<FileResult<DestinationT>> completeResults = Lists.newArrayList(existingResults); |
| if (!missingShardNums.isEmpty()) { |
| LOG.info( |
| "Creating {} empty output shards in addition to {} written for destination {}.", |
| missingShardNums.size(), |
| existingResults.size(), |
| destination); |
| for (int shard : missingShardNums) { |
| Writer<DestinationT, ?> writer = writeOperation.createWriter(); |
| // Currently this code path is only called in the unwindowed case. |
| writer.openUnwindowed(UUID.randomUUID().toString(), shard, destination); |
| FileResult<DestinationT> emptyWrite = writer.close(); |
| completeResults.add(emptyWrite); |
| } |
| LOG.debug("Done creating extra shards for {}.", destination); |
| } |
| return |
| writeOperation.buildOutputFilenames( |
| destination, |
| null, |
| (fixedNumShards == null) ? null : completeResults.size(), |
| completeResults); |
| } |
| } |
| } |