blob: 0616c405ba04f1b5d80788ef045a2acc8ef55564 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.services.bigquery.model.TableRow;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads<DestinationT, ElementT>
extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);
// The maximum number of file writers to keep open in a single bundle at a time, since file
// writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
// The first 20 tables from a single BatchLoads transform will write files inline in the
// transform. Anything beyond that might be shuffled. Users using this transform directly who
// know that they are running on workers with sufficient memory can increase this by calling
// BatchLoads#setMaxNumWritersPerBundle. This allows the workers to do more work in memory, and
// save on the cost of shuffling some of this data.
// Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
// their own policy.
@VisibleForTesting static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;
@VisibleForTesting
// Maximum number of files in a single partition.
static final int DEFAULT_MAX_FILES_PER_PARTITION = 10000;
@VisibleForTesting
// Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
static final long DEFAULT_MAX_BYTES_PER_PARTITION = 11 * (1L << 40);
// The maximum size of a single file - 4TiB, just under the 5 TiB limit.
static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40);
static final int DEFAULT_NUM_FILE_SHARDS = 0;
// If user triggering is supplied, we will trigger the file write after this many records are
// written.
static final int FILE_TRIGGERING_RECORD_COUNT = 500000;
// The maximum number of retries to poll the status of a job.
// It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
static final int DEFAULT_MAX_RETRY_JOBS = 3;
private BigQueryServices bigQueryServices;
private final WriteDisposition writeDisposition;
private final CreateDisposition createDisposition;
private final boolean ignoreUnknownValues;
// Indicates that we are writing to a constant single table. If this is the case, we will create
// the table, even if there is no data in it.
private final boolean singletonTable;
private final DynamicDestinations<?, DestinationT> dynamicDestinations;
private final Coder<DestinationT> destinationCoder;
private int maxNumWritersPerBundle;
private long maxFileSize;
private int maxFilesPerPartition;
private long maxBytesPerPartition;
private int numFileShards;
private Duration triggeringFrequency;
private ValueProvider<String> customGcsTempLocation;
private ValueProvider<String> loadJobProjectId;
private final Coder<ElementT> elementCoder;
private final SerializableFunction<ElementT, TableRow> toRowFunction;
private String kmsKey;
// The maximum number of times to retry failed load or copy jobs.
private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;
BatchLoads(
WriteDisposition writeDisposition,
CreateDisposition createDisposition,
boolean singletonTable,
DynamicDestinations<?, DestinationT> dynamicDestinations,
Coder<DestinationT> destinationCoder,
ValueProvider<String> customGcsTempLocation,
@Nullable ValueProvider<String> loadJobProjectId,
boolean ignoreUnknownValues,
Coder<ElementT> elementCoder,
SerializableFunction<ElementT, TableRow> toRowFunction,
@Nullable String kmsKey) {
bigQueryServices = new BigQueryServicesImpl();
this.writeDisposition = writeDisposition;
this.createDisposition = createDisposition;
this.singletonTable = singletonTable;
this.dynamicDestinations = dynamicDestinations;
this.destinationCoder = destinationCoder;
this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
this.maxFilesPerPartition = DEFAULT_MAX_FILES_PER_PARTITION;
this.maxBytesPerPartition = DEFAULT_MAX_BYTES_PER_PARTITION;
this.triggeringFrequency = null;
this.customGcsTempLocation = customGcsTempLocation;
this.loadJobProjectId = loadJobProjectId;
this.ignoreUnknownValues = ignoreUnknownValues;
this.elementCoder = elementCoder;
this.toRowFunction = toRowFunction;
this.kmsKey = kmsKey;
}
void setTestServices(BigQueryServices bigQueryServices) {
this.bigQueryServices = bigQueryServices;
}
/** Get the maximum number of file writers that will be open simultaneously in a bundle. */
public int getMaxNumWritersPerBundle() {
return maxNumWritersPerBundle;
}
/** Set the maximum number of file writers that will be open simultaneously in a bundle. */
public void setMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
this.maxNumWritersPerBundle = maxNumWritersPerBundle;
}
public void setTriggeringFrequency(Duration triggeringFrequency) {
this.triggeringFrequency = triggeringFrequency;
}
public int getMaxRetryJobs() {
return maxRetryJobs;
}
public void setMaxRetryJobs(int maxRetryJobs) {
this.maxRetryJobs = maxRetryJobs;
}
public void setNumFileShards(int numFileShards) {
this.numFileShards = numFileShards;
}
@VisibleForTesting
void setMaxFileSize(long maxFileSize) {
this.maxFileSize = maxFileSize;
}
@VisibleForTesting
void setMaxFilesPerPartition(int maxFilesPerPartition) {
this.maxFilesPerPartition = maxFilesPerPartition;
}
@VisibleForTesting
void setMaxBytesPerPartition(long maxBytesPerPartition) {
this.maxBytesPerPartition = maxBytesPerPartition;
}
@Override
public void validate(PipelineOptions options) {
// We will use a BigQuery load job -- validate the temp location.
String tempLocation;
if (customGcsTempLocation == null) {
tempLocation = options.getTempLocation();
} else {
if (!customGcsTempLocation.isAccessible()) {
// Can't perform verification in this case.
return;
}
tempLocation = customGcsTempLocation.get();
}
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Write needs a GCS temp location to store temp files.");
if (bigQueryServices == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
tempLocation),
e);
}
}
}
// Expand the pipeline when the user has requested periodically-triggered file writes.
private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
checkArgument(numFileShards > 0);
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(p);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
// The user-supplied triggeringDuration is often chosen to control how many BigQuery load
// jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
// is set to a large value, currently we have to buffer all the data until the trigger fires.
// Instead we ensure that the files are written if a threshold number of records are ready.
// We use only the user-supplied trigger on the actual BigQuery load. This allows us to
// offload the data to the filesystem.
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterFirst.of(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency),
AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
.discardingFiredPanes());
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
// Apply the user's trigger before we start generating BigQuery load jobs.
results =
results.apply(
"applyUserTrigger",
Window.<WriteBundlesToFiles.Result<DestinationT>>into(new GlobalWindows())
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(triggeringFrequency)))
.discardingFiredPanes());
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
new TupleTag<>("multiPartitionsTag");
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
new TupleTag<>("singlePartitionTag");
// If we have non-default triggered output, we can't use the side-input technique used in
// expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
// determinism.
PCollectionTuple partitions =
results
.apply("AttachSingletonKey", WithKeys.of((Void) null))
.setCoder(
KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
.apply("GroupOntoSingleton", GroupByKey.create())
.apply("ExtractResultValues", Values.create())
.apply(
"WritePartitionTriggered",
ParDo.of(
new WritePartition<>(
singletonTable,
dynamicDestinations,
tempFilePrefixView,
maxFilesPerPartition,
maxBytesPerPartition,
multiPartitionsTag,
singlePartitionTag))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
PCollection<KV<TableDestination, String>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
tempTables
// Now that the load job has happened, we want the rename to happen immediately.
.apply(
Window.<KV<TableDestination, String>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
.apply(WithKeys.of((Void) null))
.setCoder(
KvCoder.of(
VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
.apply(GroupByKey.create())
.apply(Values.create())
.apply(
"WriteRenameTriggered",
ParDo.of(
new WriteRename(
bigQueryServices,
loadJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey))
.withSideInputs(loadJobIdPrefixView));
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
return writeResult(p);
}
// Expand the pipeline when the user has not requested periodically-triggered file writes.
public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> input) {
Pipeline p = input.getPipeline();
final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(p);
final PCollectionView<String> tempFilePrefixView =
createTempFilePrefixView(p, loadJobIdPrefixView);
PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
input.apply(
"rewindowIntoGlobal",
Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes());
PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
(numFileShards == 0)
? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
: writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};
// This transform will look at the set of files written for each table, and if any table has
// too many files or bytes, will partition that table's files into multiple partitions for
// loading.
PCollectionTuple partitions =
results
.apply("ReifyResults", new ReifyAsIterable<>())
.setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
.apply(
"WritePartitionUntriggered",
ParDo.of(
new WritePartition<>(
singletonTable,
dynamicDestinations,
tempFilePrefixView,
maxFilesPerPartition,
maxBytesPerPartition,
multiPartitionsTag,
singlePartitionTag))
.withSideInputs(tempFilePrefixView)
.withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
PCollection<KV<TableDestination, String>> tempTables =
writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
tempTables
.apply("ReifyRenameInput", new ReifyAsIterable<>())
.setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
.apply(
"WriteRenameUntriggered",
ParDo.of(
new WriteRename(
bigQueryServices,
loadJobIdPrefixView,
writeDisposition,
createDisposition,
maxRetryJobs,
kmsKey))
.withSideInputs(loadJobIdPrefixView));
writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
return writeResult(p);
}
// Generate the base job id string.
private PCollectionView<String> createLoadJobIdPrefixView(Pipeline p) {
// Create a singleton job ID token at execution time. This will be used as the base for all
// load jobs issued from this instance of the transform.
return p.apply("JobIdCreationRoot", Create.of((Void) null))
.apply(
"CreateJobId",
ParDo.of(
new DoFn<Void, String>() {
@ProcessElement
public void process(ProcessContext c) {
c.output(
String.format(
"beam_load_%s_%s",
c.getPipelineOptions().getJobName().replaceAll("-", ""),
BigQueryHelpers.randomUUIDString()));
}
}))
.apply(View.asSingleton());
}
// Generate the temporary-file prefix.
private PCollectionView<String> createTempFilePrefixView(
Pipeline p, final PCollectionView<String> jobIdView) {
return p.apply(Create.of(""))
.apply(
"GetTempFilePrefix",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void getTempFilePrefix(ProcessContext c) {
String tempLocationRoot;
if (customGcsTempLocation != null) {
tempLocationRoot = customGcsTempLocation.get();
} else {
tempLocationRoot = c.getPipelineOptions().getTempLocation();
}
String tempLocation =
resolveTempLocation(
tempLocationRoot, "BigQueryWriteTemp", c.sideInput(jobIdView));
LOG.info(
"Writing BigQuery temporary files to {} before loading them.",
tempLocation);
c.output(tempLocation);
}
})
.withSideInputs(jobIdView))
.apply("TempFilePrefixView", View.asSingleton());
}
// Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
// file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
TupleTag<KV<ShardedKey<DestinationT>, ElementT>> unwrittedRecordsTag =
new TupleTag<KV<ShardedKey<DestinationT>, ElementT>>("unwrittenRecords") {};
PCollectionTuple writeBundlesTuple =
input.apply(
"WriteBundlesToFiles",
ParDo.of(
new WriteBundlesToFiles<>(
tempFilePrefix,
unwrittedRecordsTag,
maxNumWritersPerBundle,
maxFileSize,
toRowFunction))
.withSideInputs(tempFilePrefix)
.withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
writeBundlesTuple
.get(writtenFilesTag)
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
PCollection<KV<ShardedKey<DestinationT>, ElementT>> unwrittenRecords =
writeBundlesTuple
.get(unwrittedRecordsTag)
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));
// If the bundles contain too many output tables to be written inline to files (due to memory
// limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
// Group these records by key, and write the files after grouping. Since the record is grouped
// by key, we can ensure that only one file is open at a time in each bundle.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped =
writeShardedRecords(unwrittenRecords, tempFilePrefix);
// PCollection of filename, file byte size, and table destination.
return PCollectionList.of(writtenFiles)
.and(writtenFilesGrouped)
.apply("FlattenFiles", Flatten.pCollections())
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
// Writes input data to statically-sharded files. Returns a PCollection of filename,
// file byte size, and table destination.
PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
checkState(numFileShards > 0);
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
input
.apply(
"AddShard",
ParDo.of(
new DoFn<KV<DestinationT, ElementT>, KV<ShardedKey<DestinationT>, ElementT>>() {
int shardNumber;
@Setup
public void setup() {
shardNumber = ThreadLocalRandom.current().nextInt(numFileShards);
}
@ProcessElement
public void processElement(
@Element KV<DestinationT, ElementT> element,
OutputReceiver<KV<ShardedKey<DestinationT>, ElementT>> o) {
DestinationT destination = element.getKey();
o.output(
KV.of(
ShardedKey.of(destination, ++shardNumber % numFileShards),
element.getValue()));
}
}))
.setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));
return writeShardedRecords(shardedRecords, tempFilePrefix);
}
private PCollection<Result<DestinationT>> writeShardedRecords(
PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
PCollectionView<String> tempFilePrefix) {
return shardedRecords
.apply("GroupByDestination", GroupByKey.create())
.apply(
"WriteGroupedRecords",
ParDo.of(
new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
tempFilePrefix, maxFileSize, toRowFunction))
.withSideInputs(tempFilePrefix))
.setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
}
// Take in a list of files and write them to temporary tables.
private PCollection<KV<TableDestination, String>> writeTempTables(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
PCollectionView<String> jobIdTokenView) {
List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
sideInputs.addAll(dynamicDestinations.getSideInputs());
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));
// If the final destination table exists already (and we're appending to it), then the temp
// tables must exactly match schema, partitioning, etc. Wrap the DynamicDestinations object
// with one that makes this happen.
@SuppressWarnings("unchecked")
DynamicDestinations<?, DestinationT> destinations = dynamicDestinations;
if (createDisposition.equals(CreateDisposition.CREATE_IF_NEEDED)
|| createDisposition.equals(CreateDisposition.CREATE_NEVER)) {
destinations =
DynamicDestinationsHelpers.matchTableDynamicDestinations(destinations, bigQueryServices);
}
// If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or
// DEFAULT_MAX_BYTES_PER_PARTITION bytes, then
// the import needs to be split into multiple partitions, and those partitions will be
// specified in multiPartitionsTag.
return input
.setCoder(partitionsCoder)
// Reshuffle will distribute this among multiple workers, and also guard against
// reexecution of the WritePartitions step once WriteTables has begun.
.apply("MultiPartitionsReshuffle", Reshuffle.of())
.apply(
"MultiPartitionsWriteTables",
new WriteTables<>(
true,
bigQueryServices,
jobIdTokenView,
WriteDisposition.WRITE_EMPTY,
CreateDisposition.CREATE_IF_NEEDED,
sideInputs,
destinations,
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
kmsKey));
}
// In the case where the files fit into a single load job, there's no need to write temporary
// tables and rename. We can load these files directly into the target BigQuery table.
void writeSinglePartition(
PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
PCollectionView<String> loadJobIdPrefixView) {
List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
sideInputs.addAll(dynamicDestinations.getSideInputs());
Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
KvCoder.of(
ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
ListCoder.of(StringUtf8Coder.of()));
// Write single partition to final table
input
.setCoder(partitionsCoder)
// Reshuffle will distribute this among multiple workers, and also guard against
// reexecution of the WritePartitions step once WriteTables has begun.
.apply("SinglePartitionsReshuffle", Reshuffle.of())
.apply(
"SinglePartitionWriteTables",
new WriteTables<>(
false,
bigQueryServices,
loadJobIdPrefixView,
writeDisposition,
createDisposition,
sideInputs,
dynamicDestinations,
loadJobProjectId,
maxRetryJobs,
ignoreUnknownValues,
kmsKey));
}
private WriteResult writeResult(Pipeline p) {
PCollection<TableRow> empty =
p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
}
@Override
public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
return (triggeringFrequency != null) ? expandTriggered(input) : expandUntriggered(input);
}
}