/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.beam.sdk.io.gcp.bigquery;

import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;

import com.google.api.services.bigquery.model.TableRow;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.WriteBundlesToFiles.Result;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** PTransform that uses BigQuery batch-load jobs to write a PCollection to BigQuery. */
class BatchLoads<DestinationT, ElementT>
    extends PTransform<PCollection<KV<DestinationT, ElementT>>, WriteResult> {
  static final Logger LOG = LoggerFactory.getLogger(BatchLoads.class);

  // The maximum number of file writers to keep open in a single bundle at a time, since file
  // writers default to 64mb buffers. This comes into play when writing dynamic table destinations.
  // The first 20 tables from a single BatchLoads transform will write files inline in the
  // transform. Anything beyond that might be shuffled.  Users using this transform directly who
  // know that they are running on workers with sufficient memory can increase this by calling
  // BatchLoads#setMaxNumWritersPerBundle. This allows the workers to do more work in memory, and
  // save on the cost of shuffling some of this data.
  // Keep in mind that specific runners may decide to run multiple bundles in parallel, based on
  // their own policy.
  @VisibleForTesting static final int DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE = 20;

  @VisibleForTesting
  // Maximum number of files in a single partition.
  static final int DEFAULT_MAX_FILES_PER_PARTITION = 10000;

  @VisibleForTesting
  // Maximum number of bytes in a single partition -- 11 TiB just under BQ's 12 TiB limit.
  static final long DEFAULT_MAX_BYTES_PER_PARTITION = 11 * (1L << 40);

  // The maximum size of a single file - 4TiB, just under the 5 TiB limit.
  static final long DEFAULT_MAX_FILE_SIZE = 4 * (1L << 40);

  static final int DEFAULT_NUM_FILE_SHARDS = 0;

  // If user triggering is supplied, we will trigger the file write after this many records are
  // written.
  static final int FILE_TRIGGERING_RECORD_COUNT = 500000;

  // The maximum number of retries to poll the status of a job.
  // It sets to {@code Integer.MAX_VALUE} to block until the BigQuery job finishes.
  static final int LOAD_JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;

  static final int DEFAULT_MAX_RETRY_JOBS = 3;

  private BigQueryServices bigQueryServices;
  private final WriteDisposition writeDisposition;
  private final CreateDisposition createDisposition;
  private final boolean ignoreUnknownValues;
  // Indicates that we are writing to a constant single table. If this is the case, we will create
  // the table, even if there is no data in it.
  private final boolean singletonTable;
  private final DynamicDestinations<?, DestinationT> dynamicDestinations;
  private final Coder<DestinationT> destinationCoder;
  private int maxNumWritersPerBundle;
  private long maxFileSize;
  private int maxFilesPerPartition;
  private long maxBytesPerPartition;
  private int numFileShards;
  private Duration triggeringFrequency;
  private ValueProvider<String> customGcsTempLocation;
  private ValueProvider<String> loadJobProjectId;
  private final Coder<ElementT> elementCoder;
  private final SerializableFunction<ElementT, TableRow> toRowFunction;
  private String kmsKey;

  // The maximum number of times to retry failed load or copy jobs.
  private int maxRetryJobs = DEFAULT_MAX_RETRY_JOBS;

  BatchLoads(
      WriteDisposition writeDisposition,
      CreateDisposition createDisposition,
      boolean singletonTable,
      DynamicDestinations<?, DestinationT> dynamicDestinations,
      Coder<DestinationT> destinationCoder,
      ValueProvider<String> customGcsTempLocation,
      @Nullable ValueProvider<String> loadJobProjectId,
      boolean ignoreUnknownValues,
      Coder<ElementT> elementCoder,
      SerializableFunction<ElementT, TableRow> toRowFunction,
      @Nullable String kmsKey) {
    bigQueryServices = new BigQueryServicesImpl();
    this.writeDisposition = writeDisposition;
    this.createDisposition = createDisposition;
    this.singletonTable = singletonTable;
    this.dynamicDestinations = dynamicDestinations;
    this.destinationCoder = destinationCoder;
    this.maxNumWritersPerBundle = DEFAULT_MAX_NUM_WRITERS_PER_BUNDLE;
    this.maxFileSize = DEFAULT_MAX_FILE_SIZE;
    this.numFileShards = DEFAULT_NUM_FILE_SHARDS;
    this.maxFilesPerPartition = DEFAULT_MAX_FILES_PER_PARTITION;
    this.maxBytesPerPartition = DEFAULT_MAX_BYTES_PER_PARTITION;
    this.triggeringFrequency = null;
    this.customGcsTempLocation = customGcsTempLocation;
    this.loadJobProjectId = loadJobProjectId;
    this.ignoreUnknownValues = ignoreUnknownValues;
    this.elementCoder = elementCoder;
    this.toRowFunction = toRowFunction;
    this.kmsKey = kmsKey;
  }

  void setTestServices(BigQueryServices bigQueryServices) {
    this.bigQueryServices = bigQueryServices;
  }

  /** Get the maximum number of file writers that will be open simultaneously in a bundle. */
  public int getMaxNumWritersPerBundle() {
    return maxNumWritersPerBundle;
  }

  /** Set the maximum number of file writers that will be open simultaneously in a bundle. */
  public void setMaxNumWritersPerBundle(int maxNumWritersPerBundle) {
    this.maxNumWritersPerBundle = maxNumWritersPerBundle;
  }

  public void setTriggeringFrequency(Duration triggeringFrequency) {
    this.triggeringFrequency = triggeringFrequency;
  }

  public int getMaxRetryJobs() {
    return maxRetryJobs;
  }

  public void setMaxRetryJobs(int maxRetryJobs) {
    this.maxRetryJobs = maxRetryJobs;
  }

  public void setNumFileShards(int numFileShards) {
    this.numFileShards = numFileShards;
  }

  @VisibleForTesting
  void setMaxFileSize(long maxFileSize) {
    this.maxFileSize = maxFileSize;
  }

  @VisibleForTesting
  void setMaxFilesPerPartition(int maxFilesPerPartition) {
    this.maxFilesPerPartition = maxFilesPerPartition;
  }

  @VisibleForTesting
  void setMaxBytesPerPartition(long maxBytesPerPartition) {
    this.maxBytesPerPartition = maxBytesPerPartition;
  }

  @Override
  public void validate(PipelineOptions options) {
    // We will use a BigQuery load job -- validate the temp location.
    String tempLocation;
    if (customGcsTempLocation == null) {
      tempLocation = options.getTempLocation();
    } else {
      if (!customGcsTempLocation.isAccessible()) {
        // Can't perform verification in this case.
        return;
      }
      tempLocation = customGcsTempLocation.get();
    }
    checkArgument(
        !Strings.isNullOrEmpty(tempLocation),
        "BigQueryIO.Write needs a GCS temp location to store temp files.");
    if (bigQueryServices == null) {
      try {
        GcsPath.fromUri(tempLocation);
      } catch (IllegalArgumentException e) {
        throw new IllegalArgumentException(
            String.format(
                "BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
                tempLocation),
            e);
      }
    }
  }

  // Expand the pipeline when the user has requested periodically-triggered file writes.
  private WriteResult expandTriggered(PCollection<KV<DestinationT, ElementT>> input) {
    checkArgument(numFileShards > 0);
    Pipeline p = input.getPipeline();
    final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(p);
    final PCollectionView<String> tempFilePrefixView =
        createTempFilePrefixView(p, loadJobIdPrefixView);
    // The user-supplied triggeringDuration is often chosen to to control how many BigQuery load
    // jobs are generated, to prevent going over BigQuery's daily quota for load jobs. If this
    // is set to a large value, currently we have to buffer all the data unti the trigger fires.
    // Instead we ensure that the files are written if a threshold number of records are ready.
    // We use only the user-supplied trigger on the actual BigQuery load. This allows us to
    // offload the data to the filesystem.
    PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
        input.apply(
            "rewindowIntoGlobal",
            Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
                .triggering(
                    Repeatedly.forever(
                        AfterFirst.of(
                            AfterProcessingTime.pastFirstElementInPane()
                                .plusDelayOf(triggeringFrequency),
                            AfterPane.elementCountAtLeast(FILE_TRIGGERING_RECORD_COUNT))))
                .discardingFiredPanes());
    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
        writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);
    // Apply the user's trigger before we start generating BigQuery load jobs.
    results =
        results.apply(
            "applyUserTrigger",
            Window.<WriteBundlesToFiles.Result<DestinationT>>into(new GlobalWindows())
                .triggering(
                    Repeatedly.forever(
                        AfterProcessingTime.pastFirstElementInPane()
                            .plusDelayOf(triggeringFrequency)))
                .discardingFiredPanes());

    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
        new TupleTag<>("multiPartitionsTag");
    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
        new TupleTag<>("singlePartitionTag");

    // If we have non-default triggered output, we can't use the side-input technique used in
    // expandUntriggered . Instead make the result list a main input. Apply a GroupByKey first for
    // determinism.
    PCollectionTuple partitions =
        results
            .apply("AttachSingletonKey", WithKeys.of((Void) null))
            .setCoder(
                KvCoder.of(VoidCoder.of(), WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
            .apply("GroupOntoSingleton", GroupByKey.create())
            .apply("ExtractResultValues", Values.create())
            .apply(
                "WritePartitionTriggered",
                ParDo.of(
                        new WritePartition<>(
                            singletonTable,
                            dynamicDestinations,
                            tempFilePrefixView,
                            maxFilesPerPartition,
                            maxBytesPerPartition,
                            multiPartitionsTag,
                            singlePartitionTag))
                    .withSideInputs(tempFilePrefixView)
                    .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
    PCollection<KV<TableDestination, String>> tempTables =
        writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);
    tempTables
        // Now that the load job has happened, we want the rename to happen immediately.
        .apply(
            Window.<KV<TableDestination, String>>into(new GlobalWindows())
                .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))))
        .apply(WithKeys.of((Void) null))
        .setCoder(
            KvCoder.of(
                VoidCoder.of(), KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
        .apply(GroupByKey.create())
        .apply(Values.create())
        .apply(
            "WriteRenameTriggered",
            ParDo.of(
                    new WriteRename(
                        bigQueryServices,
                        loadJobIdPrefixView,
                        writeDisposition,
                        createDisposition,
                        maxRetryJobs,
                        kmsKey))
                .withSideInputs(loadJobIdPrefixView));
    writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
    return writeResult(p);
  }

  // Expand the pipeline when the user has not requested periodically-triggered file writes.
  public WriteResult expandUntriggered(PCollection<KV<DestinationT, ElementT>> input) {
    Pipeline p = input.getPipeline();
    final PCollectionView<String> loadJobIdPrefixView = createLoadJobIdPrefixView(p);
    final PCollectionView<String> tempFilePrefixView =
        createTempFilePrefixView(p, loadJobIdPrefixView);
    PCollection<KV<DestinationT, ElementT>> inputInGlobalWindow =
        input.apply(
            "rewindowIntoGlobal",
            Window.<KV<DestinationT, ElementT>>into(new GlobalWindows())
                .triggering(DefaultTrigger.of())
                .discardingFiredPanes());
    PCollection<WriteBundlesToFiles.Result<DestinationT>> results =
        (numFileShards == 0)
            ? writeDynamicallyShardedFiles(inputInGlobalWindow, tempFilePrefixView)
            : writeShardedFiles(inputInGlobalWindow, tempFilePrefixView);

    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> multiPartitionsTag =
        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("multiPartitionsTag") {};
    TupleTag<KV<ShardedKey<DestinationT>, List<String>>> singlePartitionTag =
        new TupleTag<KV<ShardedKey<DestinationT>, List<String>>>("singlePartitionTag") {};

    // This transform will look at the set of files written for each table, and if any table has
    // too many files or bytes, will partition that table's files into multiple partitions for
    // loading.
    PCollectionTuple partitions =
        results
            .apply("ReifyResults", new ReifyAsIterable<>())
            .setCoder(IterableCoder.of(WriteBundlesToFiles.ResultCoder.of(destinationCoder)))
            .apply(
                "WritePartitionUntriggered",
                ParDo.of(
                        new WritePartition<>(
                            singletonTable,
                            dynamicDestinations,
                            tempFilePrefixView,
                            maxFilesPerPartition,
                            maxBytesPerPartition,
                            multiPartitionsTag,
                            singlePartitionTag))
                    .withSideInputs(tempFilePrefixView)
                    .withOutputTags(multiPartitionsTag, TupleTagList.of(singlePartitionTag)));
    PCollection<KV<TableDestination, String>> tempTables =
        writeTempTables(partitions.get(multiPartitionsTag), loadJobIdPrefixView);

    tempTables
        .apply("ReifyRenameInput", new ReifyAsIterable<>())
        .setCoder(IterableCoder.of(KvCoder.of(TableDestinationCoderV2.of(), StringUtf8Coder.of())))
        .apply(
            "WriteRenameUntriggered",
            ParDo.of(
                    new WriteRename(
                        bigQueryServices,
                        loadJobIdPrefixView,
                        writeDisposition,
                        createDisposition,
                        maxRetryJobs,
                        kmsKey))
                .withSideInputs(loadJobIdPrefixView));
    writeSinglePartition(partitions.get(singlePartitionTag), loadJobIdPrefixView);
    return writeResult(p);
  }

  // Generate the base job id string.
  private PCollectionView<String> createLoadJobIdPrefixView(Pipeline p) {
    // Create a singleton job ID token at execution time. This will be used as the base for all
    // load jobs issued from this instance of the transform.
    return p.apply("JobIdCreationRoot", Create.of((Void) null))
        .apply(
            "CreateJobId",
            ParDo.of(
                new DoFn<Void, String>() {
                  @ProcessElement
                  public void process(ProcessContext c) {
                    c.output(
                        String.format(
                            "beam_load_%s_%s",
                            c.getPipelineOptions().getJobName().replaceAll("-", ""),
                            BigQueryHelpers.randomUUIDString()));
                  }
                }))
        .apply(View.asSingleton());
  }

  // Generate the temporary-file prefix.
  private PCollectionView<String> createTempFilePrefixView(
      Pipeline p, final PCollectionView<String> jobIdView) {
    return p.apply(Create.of(""))
        .apply(
            "GetTempFilePrefix",
            ParDo.of(
                    new DoFn<String, String>() {
                      @ProcessElement
                      public void getTempFilePrefix(ProcessContext c) {
                        String tempLocationRoot;
                        if (customGcsTempLocation != null) {
                          tempLocationRoot = customGcsTempLocation.get();
                        } else {
                          tempLocationRoot = c.getPipelineOptions().getTempLocation();
                        }
                        String tempLocation =
                            resolveTempLocation(
                                tempLocationRoot, "BigQueryWriteTemp", c.sideInput(jobIdView));
                        LOG.info(
                            "Writing BigQuery temporary files to {} before loading them.",
                            tempLocation);
                        c.output(tempLocation);
                      }
                    })
                .withSideInputs(jobIdView))
        .apply("TempFilePrefixView", View.asSingleton());
  }

  // Writes input data to dynamically-sharded, per-bundle files. Returns a PCollection of filename,
  // file byte size, and table destination.
  PCollection<WriteBundlesToFiles.Result<DestinationT>> writeDynamicallyShardedFiles(
      PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
    TupleTag<WriteBundlesToFiles.Result<DestinationT>> writtenFilesTag =
        new TupleTag<WriteBundlesToFiles.Result<DestinationT>>("writtenFiles") {};
    TupleTag<KV<ShardedKey<DestinationT>, ElementT>> unwrittedRecordsTag =
        new TupleTag<KV<ShardedKey<DestinationT>, ElementT>>("unwrittenRecords") {};
    PCollectionTuple writeBundlesTuple =
        input.apply(
            "WriteBundlesToFiles",
            ParDo.of(
                    new WriteBundlesToFiles<>(
                        tempFilePrefix,
                        unwrittedRecordsTag,
                        maxNumWritersPerBundle,
                        maxFileSize,
                        toRowFunction))
                .withSideInputs(tempFilePrefix)
                .withOutputTags(writtenFilesTag, TupleTagList.of(unwrittedRecordsTag)));
    PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFiles =
        writeBundlesTuple
            .get(writtenFilesTag)
            .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
    PCollection<KV<ShardedKey<DestinationT>, ElementT>> unwrittenRecords =
        writeBundlesTuple
            .get(unwrittedRecordsTag)
            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));

    // If the bundles contain too many output tables to be written inline to files (due to memory
    // limits), any unwritten records will be spilled to the unwrittenRecordsTag PCollection.
    // Group these records by key, and write the files after grouping. Since the record is grouped
    // by key, we can ensure that only one file is open at a time in each bundle.
    PCollection<WriteBundlesToFiles.Result<DestinationT>> writtenFilesGrouped =
        writeShardedRecords(unwrittenRecords, tempFilePrefix);

    // PCollection of filename, file byte size, and table destination.
    return PCollectionList.of(writtenFiles)
        .and(writtenFilesGrouped)
        .apply("FlattenFiles", Flatten.pCollections())
        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
  }

  // Writes input data to statically-sharded files. Returns a PCollection of filename,
  // file byte size, and table destination.
  PCollection<WriteBundlesToFiles.Result<DestinationT>> writeShardedFiles(
      PCollection<KV<DestinationT, ElementT>> input, PCollectionView<String> tempFilePrefix) {
    checkState(numFileShards > 0);
    PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords =
        input
            .apply(
                "AddShard",
                ParDo.of(
                    new DoFn<KV<DestinationT, ElementT>, KV<ShardedKey<DestinationT>, ElementT>>() {
                      int shardNumber;

                      @Setup
                      public void setup() {
                        shardNumber = ThreadLocalRandom.current().nextInt(numFileShards);
                      }

                      @ProcessElement
                      public void processElement(
                          @Element KV<DestinationT, ElementT> element,
                          OutputReceiver<KV<ShardedKey<DestinationT>, ElementT>> o) {
                        DestinationT destination = element.getKey();
                        o.output(
                            KV.of(
                                ShardedKey.of(destination, ++shardNumber % numFileShards),
                                element.getValue()));
                      }
                    }))
            .setCoder(KvCoder.of(ShardedKeyCoder.of(destinationCoder), elementCoder));

    return writeShardedRecords(shardedRecords, tempFilePrefix);
  }

  private PCollection<Result<DestinationT>> writeShardedRecords(
      PCollection<KV<ShardedKey<DestinationT>, ElementT>> shardedRecords,
      PCollectionView<String> tempFilePrefix) {
    return shardedRecords
        .apply("GroupByDestination", GroupByKey.create())
        .apply(
            "WriteGroupedRecords",
            ParDo.of(
                    new WriteGroupedRecordsToFiles<DestinationT, ElementT>(
                        tempFilePrefix, maxFileSize, toRowFunction))
                .withSideInputs(tempFilePrefix))
        .setCoder(WriteBundlesToFiles.ResultCoder.of(destinationCoder));
  }

  // Take in a list of files and write them to temporary tables.
  private PCollection<KV<TableDestination, String>> writeTempTables(
      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
      PCollectionView<String> jobIdTokenView) {
    List<PCollectionView<?>> sideInputs = Lists.newArrayList(jobIdTokenView);
    sideInputs.addAll(dynamicDestinations.getSideInputs());

    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
        KvCoder.of(
            ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
            ListCoder.of(StringUtf8Coder.of()));

    // If WriteBundlesToFiles produced more than DEFAULT_MAX_FILES_PER_PARTITION files or
    // DEFAULT_MAX_BYTES_PER_PARTITION bytes, then
    // the import needs to be split into multiple partitions, and those partitions will be
    // specified in multiPartitionsTag.
    return input
        .setCoder(partitionsCoder)
        // Reshuffle will distribute this among multiple workers, and also guard against
        // reexecution of the WritePartitions step once WriteTables has begun.
        .apply("MultiPartitionsReshuffle", Reshuffle.of())
        .apply(
            "MultiPartitionsWriteTables",
            new WriteTables<>(
                true,
                bigQueryServices,
                jobIdTokenView,
                WriteDisposition.WRITE_EMPTY,
                CreateDisposition.CREATE_IF_NEEDED,
                sideInputs,
                dynamicDestinations,
                loadJobProjectId,
                maxRetryJobs,
                ignoreUnknownValues,
                kmsKey));
  }

  // In the case where the files fit into a single load job, there's no need to write temporary
  // tables and rename. We can load these files directly into the target BigQuery table.
  void writeSinglePartition(
      PCollection<KV<ShardedKey<DestinationT>, List<String>>> input,
      PCollectionView<String> loadJobIdPrefixView) {
    List<PCollectionView<?>> sideInputs = Lists.newArrayList(loadJobIdPrefixView);
    sideInputs.addAll(dynamicDestinations.getSideInputs());
    Coder<KV<ShardedKey<DestinationT>, List<String>>> partitionsCoder =
        KvCoder.of(
            ShardedKeyCoder.of(NullableCoder.of(destinationCoder)),
            ListCoder.of(StringUtf8Coder.of()));
    // Write single partition to final table
    input
        .setCoder(partitionsCoder)
        // Reshuffle will distribute this among multiple workers, and also guard against
        // reexecution of the WritePartitions step once WriteTables has begun.
        .apply("SinglePartitionsReshuffle", Reshuffle.of())
        .apply(
            "SinglePartitionWriteTables",
            new WriteTables<>(
                false,
                bigQueryServices,
                loadJobIdPrefixView,
                writeDisposition,
                createDisposition,
                sideInputs,
                dynamicDestinations,
                loadJobProjectId,
                maxRetryJobs,
                ignoreUnknownValues,
                kmsKey));
  }

  private WriteResult writeResult(Pipeline p) {
    PCollection<TableRow> empty =
        p.apply("CreateEmptyFailedInserts", Create.empty(TypeDescriptor.of(TableRow.class)));
    return WriteResult.in(p, new TupleTag<>("failedInserts"), empty);
  }

  @Override
  public WriteResult expand(PCollection<KV<DestinationT, ElementT>> input) {
    return (triggeringFrequency != null) ? expandTriggered(input) : expandUntriggered(input);
  }
}
