blob: e1ad4b44786488ce5154e776419afec1b9447ef2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.createTempTableReference;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.Clustering;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableCell;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableRefToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSchemaToJsonSchema;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TableSpecToTableRef;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.TimePartitioningToJson;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryResourceNaming.JobType;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.ExtractResult;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantSchemaDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.ConstantTimePartitioningDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.SchemaFromViewDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinationsHelpers.TableFunctionDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.CleanupOperation;
import org.apache.beam.sdk.io.gcp.bigquery.PassThroughThenCleanup.ContextContainer;
import org.apache.beam.sdk.io.gcp.bigquery.RowWriterFactory.OutputType;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.schemas.FieldAccessDescriptor;
import org.apache.beam.sdk.schemas.ProjectionProducer;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* {@link PTransform}s for reading and writing <a
* href="https://developers.google.com/bigquery/">BigQuery</a> tables.
*
* <h3>Table References</h3>
*
* <p>A fully-qualified BigQuery table name consists of three components:
*
* <ul>
* <li>{@code projectId}: the Cloud project id (defaults to {@link GcpOptions#getProject()}).
* <li>{@code datasetId}: the BigQuery dataset id, unique within a project.
* <li>{@code tableId}: a table id, unique within a dataset.
* </ul>
*
* <p>BigQuery table references are stored as a {@link TableReference}, which comes from the <a
* href="https://cloud.google.com/bigquery/client-libraries">BigQuery Java Client API</a>. Tables
* can be referred to as Strings, with or without the {@code projectId}. A helper function is
* provided ({@link BigQueryHelpers#parseTableSpec(String)}) that parses the following string forms
* into a {@link TableReference}:
*
* <ul>
* <li>[{@code project_id}]:[{@code dataset_id}].[{@code table_id}]
* <li>[{@code dataset_id}].[{@code table_id}]
* </ul>
*
* <h3>BigQuery Concepts</h3>
*
* <p>Tables have rows ({@link TableRow}) and each row has cells ({@link TableCell}). A table has a
* schema ({@link TableSchema}), which in turn describes the schema of each cell ({@link
* TableFieldSchema}). The terms field and cell are used interchangeably.
*
* <p>{@link TableSchema}: describes the schema (types and order) for values in each row. It has one
* attribute, 'fields', which is list of {@link TableFieldSchema} objects.
*
* <p>{@link TableFieldSchema}: describes the schema (type, name) for one field. It has several
* attributes, including 'name' and 'type'. Common values for the type attribute are: 'STRING',
* 'INTEGER', 'FLOAT', 'BOOLEAN', 'NUMERIC', 'GEOGRAPHY'. All possible values are described at: <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types">
* https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types</a>
*
* <p>{@link TableRow}: Holds all values in a table row. Has one attribute, 'f', which is a list of
* {@link TableCell} instances.
*
* <p>{@link TableCell}: Holds the value for one cell (or field). Has one attribute, 'v', which is
* the value of the table cell.
*
* <p>As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision
* decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with
* Well-Known Text (See <a href="https://en.wikipedia.org/wiki/Well-known_text">
* https://en.wikipedia.org/wiki/Well-known_text</a>) format for reading and writing to BigQuery.
* BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to
* BigQuery. When bytes are read from BigQuery they are returned as base64-encoded strings.
*
* <h3>Reading</h3>
*
* <p>Reading from BigQuery is supported by {@link #read(SerializableFunction)}, which parses
* records in <a href="https://cloud.google.com/bigquery/data-formats#avro_format">AVRO format</a>
* into a custom type (see the table below for type conversion) using a specified parse function,
* and by {@link #readTableRows} which parses them into {@link TableRow}, which may be more
* convenient but has lower performance.
*
* <p>Both functions support reading either from a table or from the result of a query, via {@link
* TypedRead#from(String)} and {@link TypedRead#fromQuery} respectively. Exactly one of these must
* be specified.
*
* <p>If you are reading from an authorized view wih {@link TypedRead#fromQuery}, you need to use
* {@link TypedRead#withQueryLocation(String)} to set the location of the BigQuery job. Otherwise,
* Beam will ty to determine that location by reading the metadata of the dataset that contains the
* underlying tables. With authorized views, that will result in a 403 error and the query will not
* be resolved.
*
* <p><b>Type Conversion Table</b>
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>BigQuery standard SQL type</b> </td> <td> <b>Avro type</b> </td> <td> <b>Java type</b> </td>
* </tr>
* <tr>
* <td> BOOLEAN </td> <td> boolean </td> <td> Boolean </td>
* </tr>
* <tr>
* <td> INT64 </td> <td> long </td> <td> Long </td>
* </tr>
* <tr>
* <td> FLOAT64 </td> <td> double </td> <td> Double </td>
* </tr>
* <tr>
* <td> BYTES </td> <td> bytes </td> <td> java.nio.ByteBuffer </td>
* </tr>
* <tr>
* <td> STRING </td> <td> string </td> <td> CharSequence </td>
* </tr>
* <tr>
* <td> DATE </td> <td> int </td> <td> Integer </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> string </td> <td> CharSequence </td>
* </tr>
* <tr>
* <td> TIMESTAMP </td> <td> long </td> <td> Long </td>
* </tr>
* <tr>
* <td> TIME </td> <td> long </td> <td> Long </td>
* </tr>
* <tr>
* <td> NUMERIC </td> <td> bytes </td> <td> java.nio.ByteBuffer </td>
* </tr>
* <tr>
* <td> GEOGRAPHY </td> <td> string </td> <td> CharSequence </td>
* </tr>
* <tr>
* <td> ARRAY </td> <td> array </td> <td> java.util.Collection </td>
* </tr>
* <tr>
* <td> STRUCT </td> <td> record </td> <td> org.apache.avro.generic.GenericRecord </td>
* </tr>
* </table>
*
* <p><b>Example: Reading rows of a table as {@link TableRow}.</b>
*
* <pre>{@code
* PCollection<TableRow> weatherData = pipeline.apply(
* BigQueryIO.readTableRows().from("clouddataflow-readonly:samples.weather_stations"));
* }</pre>
*
* <b>Example: Reading rows of a table and parsing them into a custom type.</b>
*
* <pre>{@code
* PCollection<WeatherRecord> weatherData = pipeline.apply(
* BigQueryIO
* .read(new SerializableFunction<SchemaAndRecord, WeatherRecord>() {
* public WeatherRecord apply(SchemaAndRecord schemaAndRecord) {
* return new WeatherRecord(...);
* }
* })
* .from("clouddataflow-readonly:samples.weather_stations"))
* .withCoder(SerializableCoder.of(WeatherRecord.class));
* }</pre>
*
* <p>Note: When using {@link #read(SerializableFunction)}, you may sometimes need to use {@link
* TypedRead#withCoder(Coder)} to specify a {@link Coder} for the result type, if Beam fails to
* infer it automatically.
*
* <p><b>Example: Reading results of a query as {@link TableRow}.</b>
*
* <pre>{@code
* PCollection<TableRow> meanTemperatureData = pipeline.apply(BigQueryIO.readTableRows()
* .fromQuery("SELECT year, mean_temp FROM [samples.weather_stations]"));
* }</pre>
*
* <p>Users can optionally specify a query priority using {@link
* TypedRead#withQueryPriority(TypedRead.QueryPriority)} and a geographic location where the query
* will be executed using {@link TypedRead#withQueryLocation(String)}. Query location must be
* specified for jobs that are not executed in US or EU, or if you are reading from an authorized
* view. See <a href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery
* Jobs: query</a>.
*
* <h3>Writing</h3>
*
* <p>To write to a BigQuery table, apply a {@link BigQueryIO.Write} transformation. This consumes a
* {@link PCollection} of a user-defined type when using {@link BigQueryIO#write()} (recommended),
* or a {@link PCollection} of {@link TableRow TableRows} as input when using {@link
* BigQueryIO#writeTableRows()} (not recommended). When using a user-defined type, one of the
* following must be provided.
*
* <ul>
* <li>{@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} (recommended) to
* write data using avro records.
* <li>{@link BigQueryIO.Write#withAvroWriter} to write avro data using a user-specified {@link
* DatumWriter} (and format function).
* <li>{@link BigQueryIO.Write#withFormatFunction(SerializableFunction)} to write data as json
* encoded {@link TableRow TableRows}.
* </ul>
*
* If {@link BigQueryIO.Write#withAvroFormatFunction(SerializableFunction)} or {@link
* BigQueryIO.Write#withAvroWriter} is used, the table schema MUST be specified using one of the
* {@link Write#withJsonSchema(String)}, {@link Write#withJsonSchema(ValueProvider)}, {@link
* Write#withSchemaFromView(PCollectionView)} methods, or {@link Write#to(DynamicDestinations)}.
*
* <pre>{@code
* class Quote {
* final Instant timestamp;
* final String exchange;
* final String symbol;
* final double price;
*
* Quote(Instant timestamp, String exchange, String symbol, double price) {
* // initialize all member variables.
* }
* }
*
* PCollection<Quote> quotes = ...
*
* quotes.apply(BigQueryIO
* .<Quote>write()
* .to("my-project:my_dataset.my_table")
* .withSchema(new TableSchema().setFields(
* ImmutableList.of(
* new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"),
* new TableFieldSchema().setName("exchange").setType("STRING"),
* new TableFieldSchema().setName("symbol").setType("STRING"),
* new TableFieldSchema().setName("price").setType("FLOAT"))))
* .withFormatFunction(quote -> new TableRow().set(..set the columns..))
* .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
* }</pre>
*
* <p>See {@link BigQueryIO.Write} for details on how to specify if a write should append to an
* existing table, replace the table, or verify that the table is empty. Note that the dataset being
* written to must already exist. Unbounded PCollections can only be written using {@link
* Write.WriteDisposition#WRITE_EMPTY} or {@link Write.WriteDisposition#WRITE_APPEND}.
*
* <p>BigQueryIO supports automatically inferring the BigQuery table schema from the Beam schema on
* the input PCollection. Beam can also automatically format the input into a TableRow in this case,
* if no format function is provide. In the above example, the quotes PCollection has a schema that
* Beam infers from the Quote POJO. So the write could be done more simply as follows:
*
* <pre>{@code
* {@literal @}DefaultSchema(JavaFieldSchema.class)
* class Quote {
* final Instant timestamp;
* final String exchange;
* final String symbol;
* final double price;
*
* {@literal @}SchemaCreate
* Quote(Instant timestamp, String exchange, String symbol, double price) {
* // initialize all member variables.
* }
* }
*
* PCollection<Quote> quotes = ...
*
* quotes.apply(BigQueryIO
* .<Quote>write()
* .to("my-project:my_dataset.my_table")
* .useBeamSchema()
* .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));
* }</pre>
*
* <h3>Loading historical data into time-partitioned BigQuery tables</h3>
*
* <p>To load historical data into a time-partitioned BigQuery table, specify {@link
* BigQueryIO.Write#withTimePartitioning} with a {@link TimePartitioning#setField(String) field}
* used for <a
* href="https://cloud.google.com/bigquery/docs/partitioned-tables#partitioned_tables">column-based
* partitioning</a>. For example:
*
* <pre>{@code
* PCollection<Quote> quotes = ...;
*
* quotes.apply(BigQueryIO.write()
* .withSchema(schema)
* .withFormatFunction(quote -> new TableRow()
* .set("timestamp", quote.getTimestamp())
* .set(..other columns..))
* .to("my-project:my_dataset.my_table")
* .withTimePartitioning(new TimePartitioning().setField("time")));
* }</pre>
*
* <h3>Writing different values to different tables</h3>
*
* <p>A common use case is to dynamically generate BigQuery table names based on the current value.
* To support this, {@link BigQueryIO.Write#to(SerializableFunction)} accepts a function mapping the
* current element to a tablespec. For example, here's code that outputs quotes of different stocks
* to different tables:
*
* <pre>{@code
* PCollection<Quote> quotes = ...;
*
* quotes.apply(BigQueryIO.write()
* .withSchema(schema)
* .withFormatFunction(quote -> new TableRow()...)
* .to((ValueInSingleWindow<Quote> quote) -> {
* String symbol = quote.getSymbol();
* return new TableDestination(
* "my-project:my_dataset.quotes_" + symbol, // Table spec
* "Quotes of stock " + symbol // Table description
* );
* });
* }</pre>
*
* <p>Per-table schemas can also be provided using {@link BigQueryIO.Write#withSchemaFromView}. This
* allows you the schemas to be calculated based on a previous pipeline stage or statically via a
* {@link org.apache.beam.sdk.transforms.Create} transform. This method expects to receive a
* map-valued {@link PCollectionView}, mapping table specifications (project:dataset.table-id), to
* JSON formatted {@link TableSchema} objects. All destination tables must be present in this map,
* or the pipeline will fail to create tables. Care should be taken if the map value is based on a
* triggered aggregation over and unbounded {@link PCollection}; the side input will contain the
* entire history of all table schemas ever generated, which might blow up memory usage. This method
* can also be useful when writing to a single table, as it allows a previous stage to calculate the
* schema (possibly based on the full collection of records being written to BigQuery).
*
* <p>For the most general form of dynamic table destinations and schemas, look at {@link
* BigQueryIO.Write#to(DynamicDestinations)}.
*
* <h3>Insertion Method</h3>
*
* {@link BigQueryIO.Write} supports two methods of inserting data into BigQuery specified using
* {@link BigQueryIO.Write#withMethod}. If no method is supplied, then a default method will be
* chosen based on the input PCollection. See {@link BigQueryIO.Write.Method} for more information
* about the methods. The different insertion methods provide different tradeoffs of cost, quota,
* and data consistency; please see BigQuery documentation for more information about these
* tradeoffs.
*
* <h3>Usage with templates</h3>
*
* <p>When using {@link #read} or {@link #readTableRows()} in a template, it's required to specify
* {@link Read#withTemplateCompatibility()}. Specifying this in a non-template pipeline is not
* recommended because it has somewhat lower performance.
*
* <p>When using {@link #write()} or {@link #writeTableRows()} with batch loads in a template, it is
* recommended to specify {@link Write#withCustomGcsTempLocation}. Writing to BigQuery via batch
* loads involves writing temporary files to this location, so the location must be accessible at
* pipeline execution time. By default, this location is captured at pipeline <i>construction</i>
* time, may be inaccessible if the template may be reused from a different project or at a moment
* when the original location no longer exists. {@link
* Write#withCustomGcsTempLocation(ValueProvider)} allows specifying the location as an argument to
* the template invocation.
*
* <h3>Permissions</h3>
*
* <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the
* pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more
* details.
*
* <p>Please see <a href="https://cloud.google.com/bigquery/access-control">BigQuery Access Control
* </a> for security and permission related information specific to BigQuery.
*
* <h3>Updates to the I/O connector code</h3>
*
* For any significant updates to this I/O connector, please consider involving corresponding code
* reviewers mentioned <a
* href="https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/OWNERS">
* here</a>.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20506)
})
public class BigQueryIO {
/**
* Template for BigQuery jobs created by BigQueryIO. This template is: {@code
* "beam_bq_job_{TYPE}_{JOB_ID}_{STEP}_{RANDOM}"}, where:
*
* <ul>
* <li>{@code TYPE} represents the BigQuery job type (e.g. extract / copy / load / query)
* <li>{@code JOB_ID} is the Beam job name.
* <li>{@code STEP} is a UUID representing the Dataflow step that created the BQ job.
* <li>{@code RANDOM} is a random string.
* </ul>
*
* <p><b>NOTE:</b> This job name template does not have backwards compatibility guarantees.
*/
public static final String BIGQUERY_JOB_TEMPLATE = "beam_bq_job_{TYPE}_{JOB_ID}_{STEP}_{RANDOM}";
private static final Logger LOG = LoggerFactory.getLogger(BigQueryIO.class);
/** Singleton instance of the JSON factory used to read and write JSON formatted rows. */
static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();
/**
* Project IDs must contain 6-63 lowercase letters, digits, or dashes. IDs must start with a
* letter and may not end with a dash. This regex isn't exact - this allows for patterns that
* would be rejected by the service, but this is sufficient for basic parsing of table references.
*/
private static final String PROJECT_ID_REGEXP = "[a-z][-a-z0-9:.]{4,61}[a-z0-9]";
/** Regular expression that matches Dataset IDs. */
private static final String DATASET_REGEXP = "[-\\w.]{1,1024}";
/** Regular expression that matches Table IDs. */
private static final String TABLE_REGEXP = "[-\\w$@]{1,1024}";
/**
* Matches table specifications in the form {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"}.
*/
private static final String DATASET_TABLE_REGEXP =
String.format(
"((?<PROJECT>%s)[:\\.])?(?<DATASET>%s)\\.(?<TABLE>%s)",
PROJECT_ID_REGEXP, DATASET_REGEXP, TABLE_REGEXP);
static final Pattern TABLE_SPEC = Pattern.compile(DATASET_TABLE_REGEXP);
/**
* Matches table specifications in the form {@code "projects/[project_id]/datasets/[dataset_id]/tables[table_id]".
*/
private static final String TABLE_URN_REGEXP =
String.format(
"projects/(?<PROJECT>%s)/datasets/(?<DATASET>%s)/tables/(?<TABLE>%s)",
PROJECT_ID_REGEXP, DATASET_REGEXP, TABLE_REGEXP);
static final Pattern TABLE_URN_SPEC = Pattern.compile(TABLE_URN_REGEXP);
/**
* A formatting function that maps a TableRow to itself. This allows sending a {@code
* PCollection<TableRow>} directly to BigQueryIO.Write.
*/
static final SerializableFunction<TableRow, TableRow> IDENTITY_FORMATTER = input -> input;
static final SerializableFunction<org.apache.avro.Schema, DatumWriter<GenericRecord>>
GENERIC_DATUM_WRITER_FACTORY = schema -> new GenericDatumWriter<>();
private static final SerializableFunction<TableSchema, org.apache.avro.Schema>
DEFAULT_AVRO_SCHEMA_FACTORY =
(SerializableFunction<TableSchema, org.apache.avro.Schema>)
input -> BigQueryAvroUtils.toGenericAvroSchema("root", input.getFields());
/**
* @deprecated Use {@link #read(SerializableFunction)} or {@link #readTableRows} instead. {@link
* #readTableRows()} does exactly the same as {@link #read}, however {@link
* #read(SerializableFunction)} performs better.
*/
@Deprecated
public static Read read() {
return new Read();
}
/**
* Like {@link #read(SerializableFunction)} but represents each row as a {@link TableRow}.
*
* <p>This method is more convenient to use in some cases, but usually has significantly lower
* performance than using {@link #read(SerializableFunction)} directly to parse data into a
* domain-specific type, due to the overhead of converting the rows to {@link TableRow}.
*/
public static TypedRead<TableRow> readTableRows() {
return read(new TableRowParser()).withCoder(TableRowJsonCoder.of());
}
/** Like {@link #readTableRows()} but with {@link Schema} support. */
public static TypedRead<TableRow> readTableRowsWithSchema() {
return read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withBeamRowConverters(
TypeDescriptor.of(TableRow.class),
BigQueryUtils.tableRowToBeamRow(),
BigQueryUtils.tableRowFromBeamRow());
}
/**
* Reads from a BigQuery table or query and returns a {@link PCollection} with one element per
* each row of the table or query result, parsed from the BigQuery AVRO format using the specified
* function.
*
* <p>Each {@link SchemaAndRecord} contains a BigQuery {@link TableSchema} and a {@link
* GenericRecord} representing the row, indexed by column name. Here is a sample parse function
* that parses click events from a table.
*
* <pre>{@code
* class ClickEvent { long userId; String url; ... }
*
* p.apply(BigQueryIO.read(new SerializableFunction<SchemaAndRecord, ClickEvent>() {
* public ClickEvent apply(SchemaAndRecord record) {
* GenericRecord r = record.getRecord();
* return new ClickEvent((Long) r.get("userId"), (String) r.get("url"));
* }
* }).from("...");
* }</pre>
*/
public static <T> TypedRead<T> read(SerializableFunction<SchemaAndRecord, T> parseFn) {
return new AutoValue_BigQueryIO_TypedRead.Builder<T>()
.setValidate(true)
.setWithTemplateCompatibility(false)
.setBigQueryServices(new BigQueryServicesImpl())
.setParseFn(parseFn)
.setMethod(TypedRead.Method.DEFAULT)
.setUseAvroLogicalTypes(false)
.setFormat(DataFormat.AVRO)
.setProjectionPushdownApplied(false)
.build();
}
@VisibleForTesting
static class TableRowParser implements SerializableFunction<SchemaAndRecord, TableRow> {
public static final TableRowParser INSTANCE = new TableRowParser();
@Override
public TableRow apply(SchemaAndRecord schemaAndRecord) {
return BigQueryAvroUtils.convertGenericRecordToTableRow(
schemaAndRecord.getRecord(), schemaAndRecord.getTableSchema());
}
}
/** Implementation of {@link BigQueryIO#read()}. */
public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
private final TypedRead<TableRow> inner;
Read() {
this(BigQueryIO.read(TableRowParser.INSTANCE).withCoder(TableRowJsonCoder.of()));
}
Read(TypedRead<TableRow> inner) {
this.inner = inner;
}
@Override
public PCollection<TableRow> expand(PBegin input) {
return input.apply(inner);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
this.inner.populateDisplayData(builder);
}
boolean getValidate() {
return this.inner.getValidate();
}
ValueProvider<String> getQuery() {
return this.inner.getQuery();
}
public Read withTestServices(BigQueryServices testServices) {
return new Read(this.inner.withTestServices(testServices));
}
///////////////////////////////////////////////////////////////////
/** Returns the table to read, or {@code null} if reading from a query instead. */
public @Nullable ValueProvider<TableReference> getTableProvider() {
return this.inner.getTableProvider();
}
/** Returns the table to read, or {@code null} if reading from a query instead. */
public @Nullable TableReference getTable() {
return this.inner.getTable();
}
/**
* Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or {@code
* "[dataset_id].[table_id]"} for tables within the current project.
*/
public Read from(String tableSpec) {
return new Read(this.inner.from(tableSpec));
}
/** Same as {@code from(String)}, but with a {@link ValueProvider}. */
public Read from(ValueProvider<String> tableSpec) {
return new Read(this.inner.from(tableSpec));
}
/** Read from table specified by a {@link TableReference}. */
public Read from(TableReference table) {
return new Read(this.inner.from(table));
}
/**
* Reads results received after executing the given query.
*
* <p>By default, the query results will be flattened -- see "flattenResults" in the <a
* href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">Jobs documentation</a> for
* more information. To disable flattening, use {@link Read#withoutResultFlattening}.
*
* <p>By default, the query will use BigQuery's legacy SQL dialect. To use the BigQuery Standard
* SQL dialect, use {@link Read#usingStandardSql}.
*/
public Read fromQuery(String query) {
return new Read(this.inner.fromQuery(query));
}
/** Same as {@code fromQuery(String)}, but with a {@link ValueProvider}. */
public Read fromQuery(ValueProvider<String> query) {
return new Read(this.inner.fromQuery(query));
}
/**
* Disable validation that the table exists or the query succeeds prior to pipeline submission.
* Basic validation (such as ensuring that a query or table is specified) still occurs.
*/
public Read withoutValidation() {
return new Read(this.inner.withoutValidation());
}
/**
* Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">flattening of
* query results</a>.
*
* <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
* from a table will cause an error during validation.
*/
public Read withoutResultFlattening() {
return new Read(this.inner.withoutResultFlattening());
}
/**
* Enables BigQuery's Standard SQL dialect when reading from a query.
*
* <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
* from a table will cause an error during validation.
*/
public Read usingStandardSql() {
return new Read(this.inner.usingStandardSql());
}
/**
* Use new template-compatible source implementation.
*
* <p>Use new template-compatible source implementation. This implementation is compatible with
* repeated template invocations. It does not support dynamic work rebalancing.
*/
public Read withTemplateCompatibility() {
return new Read(this.inner.withTemplateCompatibility());
}
}
/////////////////////////////////////////////////////////////////////////////
/** Implementation of {@link BigQueryIO#read(SerializableFunction)}. */
@AutoValue
public abstract static class TypedRead<T> extends PTransform<PBegin, PCollection<T>>
implements ProjectionProducer<PTransform<PBegin, PCollection<T>>> {
/** Determines the method used to read data from BigQuery. */
public enum Method {
/** The default behavior if no method is explicitly set. Currently {@link #EXPORT}. */
DEFAULT,
/**
* Export data to Google Cloud Storage in Avro format and read data files from that location.
*/
EXPORT,
/** Read the contents of a table directly using the BigQuery storage API. */
DIRECT_READ,
}
interface ToBeamRowFunction<T>
extends SerializableFunction<Schema, SerializableFunction<T, Row>> {}
interface FromBeamRowFunction<T>
extends SerializableFunction<Schema, SerializableFunction<Row, T>> {}
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
abstract Builder<T> setQuery(ValueProvider<String> query);
abstract Builder<T> setValidate(boolean validate);
abstract Builder<T> setFlattenResults(Boolean flattenResults);
abstract Builder<T> setUseLegacySql(Boolean useLegacySql);
abstract Builder<T> setWithTemplateCompatibility(Boolean useTemplateCompatibility);
abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
abstract Builder<T> setQueryPriority(QueryPriority priority);
abstract Builder<T> setQueryLocation(String location);
abstract Builder<T> setQueryTempDataset(String queryTempDataset);
abstract Builder<T> setMethod(TypedRead.Method method);
@Experimental(Experimental.Kind.SOURCE_SINK)
abstract Builder<T> setFormat(DataFormat method);
abstract Builder<T> setSelectedFields(ValueProvider<List<String>> selectedFields);
abstract Builder<T> setRowRestriction(ValueProvider<String> rowRestriction);
abstract TypedRead<T> build();
abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> parseFn);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Builder<T> setKmsKey(String kmsKey);
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setTypeDescriptor(TypeDescriptor<T> typeDescriptor);
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setToBeamRowFn(ToBeamRowFunction<T> toRowFn);
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setFromBeamRowFn(FromBeamRowFunction<T> fromRowFn);
abstract Builder<T> setUseAvroLogicalTypes(Boolean useAvroLogicalTypes);
abstract Builder<T> setProjectionPushdownApplied(boolean projectionPushdownApplied);
}
abstract @Nullable ValueProvider<String> getJsonTableRef();
abstract @Nullable ValueProvider<String> getQuery();
abstract boolean getValidate();
abstract @Nullable Boolean getFlattenResults();
abstract @Nullable Boolean getUseLegacySql();
abstract Boolean getWithTemplateCompatibility();
abstract BigQueryServices getBigQueryServices();
abstract SerializableFunction<SchemaAndRecord, T> getParseFn();
abstract @Nullable QueryPriority getQueryPriority();
abstract @Nullable String getQueryLocation();
abstract @Nullable String getQueryTempDataset();
abstract TypedRead.Method getMethod();
@Experimental(Experimental.Kind.SOURCE_SINK)
abstract DataFormat getFormat();
abstract @Nullable ValueProvider<List<String>> getSelectedFields();
abstract @Nullable ValueProvider<String> getRowRestriction();
abstract @Nullable Coder<T> getCoder();
abstract @Nullable String getKmsKey();
@Experimental(Kind.SCHEMAS)
abstract @Nullable TypeDescriptor<T> getTypeDescriptor();
@Experimental(Kind.SCHEMAS)
abstract @Nullable ToBeamRowFunction<T> getToBeamRowFn();
@Experimental(Kind.SCHEMAS)
abstract @Nullable FromBeamRowFunction<T> getFromBeamRowFn();
abstract Boolean getUseAvroLogicalTypes();
abstract boolean getProjectionPushdownApplied();
/**
* An enumeration type for the priority of a query.
*
* @see <a href="https://cloud.google.com/bigquery/docs/running-queries">Running Interactive and
* Batch Queries in the BigQuery documentation</a>
*/
public enum QueryPriority {
/**
* Specifies that a query should be run with an INTERACTIVE priority.
*
* <p>Interactive mode allows for BigQuery to execute the query as soon as possible. These
* queries count towards your concurrent rate limit and your daily limit.
*/
INTERACTIVE,
/**
* Specifies that a query should be run with a BATCH priority.
*
* <p>Batch mode queries are queued by BigQuery. These are started as soon as idle resources
* are available, usually within a few minutes. Batch queries don’t count towards your
* concurrent rate limit.
*/
BATCH
}
@VisibleForTesting
Coder<T> inferCoder(CoderRegistry coderRegistry) {
if (getCoder() != null) {
return getCoder();
}
try {
return coderRegistry.getCoder(TypeDescriptors.outputOf(getParseFn()));
} catch (CannotProvideCoderException e) {
throw new IllegalArgumentException(
"Unable to infer coder for output of parseFn. Specify it explicitly using withCoder().",
e);
}
}
private BigQuerySourceDef createSourceDef() {
BigQuerySourceDef sourceDef;
if (getQuery() == null) {
sourceDef = BigQueryTableSourceDef.create(getBigQueryServices(), getTableProvider());
} else {
sourceDef =
BigQueryQuerySourceDef.create(
getBigQueryServices(),
getQuery(),
getFlattenResults(),
getUseLegacySql(),
MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
getQueryLocation(),
getQueryTempDataset(),
getKmsKey());
}
return sourceDef;
}
private BigQueryStorageQuerySource<T> createStorageQuerySource(
String stepUuid, Coder<T> outputCoder) {
return BigQueryStorageQuerySource.create(
stepUuid,
getQuery(),
getFlattenResults(),
getUseLegacySql(),
MoreObjects.firstNonNull(getQueryPriority(), QueryPriority.BATCH),
getQueryLocation(),
getQueryTempDataset(),
getKmsKey(),
getFormat(),
getParseFn(),
outputCoder,
getBigQueryServices());
}
private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";
@Override
public void validate(PipelineOptions options) {
// Even if existence validation is disabled, we need to make sure that the BigQueryIO
// read is properly specified.
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
if (getMethod() != TypedRead.Method.DIRECT_READ) {
String tempLocation = bqOptions.getTempLocation();
checkArgument(
!Strings.isNullOrEmpty(tempLocation),
"BigQueryIO.Read needs a GCS temp location to store temp files."
+ "This can be set with option --tempLocation.");
if (getBigQueryServices() == null) {
try {
GcsPath.fromUri(tempLocation);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
String.format(
"BigQuery temp location expected a valid 'gs://' path, but was given '%s'",
tempLocation),
e);
}
}
}
ValueProvider<TableReference> table = getTableProvider();
// Note that a table or query check can fail if the table or dataset are created by
// earlier stages of the pipeline or if a query depends on earlier stages of a pipeline.
// For these cases the withoutValidation method can be used to disable the check.
if (getValidate()) {
try (DatasetService datasetService = getBigQueryServices().getDatasetService(bqOptions)) {
if (table != null) {
checkArgument(
table.isAccessible(), "Cannot call validate if table is dynamically set.");
}
if (table != null && table.get().getProjectId() != null) {
// Check for source table presence for early failure notification.
BigQueryHelpers.verifyDatasetPresence(datasetService, table.get());
BigQueryHelpers.verifyTablePresence(datasetService, table.get());
} else if (getQuery() != null) {
checkArgument(
getQuery().isAccessible(), "Cannot call validate if query is dynamically set.");
JobService jobService = getBigQueryServices().getJobService(bqOptions);
try {
jobService.dryRunQuery(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject(),
new JobConfigurationQuery()
.setQuery(getQuery().get())
.setFlattenResults(getFlattenResults())
.setUseLegacySql(getUseLegacySql()),
getQueryLocation());
} catch (Exception e) {
throw new IllegalArgumentException(
String.format(QUERY_VALIDATION_FAILURE_ERROR, getQuery().get()), e);
}
// If the user provided a temp dataset, check if the dataset exists before launching the
// query
if (getQueryTempDataset() != null) {
// The temp table is only used for dataset and project id validation, not for table
// name
// validation
TableReference tempTable =
new TableReference()
.setProjectId(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject())
.setDatasetId(getQueryTempDataset())
.setTableId("dummy table");
BigQueryHelpers.verifyDatasetPresence(datasetService, tempTable);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
@Override
public PCollection<T> expand(PBegin input) {
ValueProvider<TableReference> table = getTableProvider();
if (table != null) {
checkArgument(getQuery() == null, "from() and fromQuery() are exclusive");
checkArgument(
getQueryPriority() == null,
"withQueryPriority() can only be specified when using fromQuery()");
checkArgument(
getFlattenResults() == null,
"Invalid BigQueryIO.Read: Specifies a table with a result flattening"
+ " preference, which only applies to queries");
checkArgument(
getUseLegacySql() == null,
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+ " preference, which only applies to queries");
checkArgument(
getQueryTempDataset() == null,
"Invalid BigQueryIO.Read: Specifies a temp dataset, which can"
+ " only be specified when using fromQuery()");
if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) {
LOG.info(
"Project of {} not set. The value of {}.getProject() at execution time will be used.",
TableReference.class.getSimpleName(),
BigQueryOptions.class.getSimpleName());
}
} else {
checkArgument(getQuery() != null, "Either from() or fromQuery() is required");
checkArgument(
getFlattenResults() != null, "flattenResults should not be null if query is set");
checkArgument(getUseLegacySql() != null, "useLegacySql should not be null if query is set");
}
checkArgument(getParseFn() != null, "A parseFn is required");
// if both toRowFn and fromRowFn values are set, enable Beam schema support
boolean beamSchemaEnabled = false;
if (getTypeDescriptor() != null && getToBeamRowFn() != null && getFromBeamRowFn() != null) {
beamSchemaEnabled = true;
}
Pipeline p = input.getPipeline();
final Coder<T> coder = inferCoder(p.getCoderRegistry());
if (getMethod() == TypedRead.Method.DIRECT_READ) {
return expandForDirectRead(input, coder);
}
checkArgument(
getSelectedFields() == null,
"Invalid BigQueryIO.Read: Specifies selected fields, "
+ "which only applies when using Method.DIRECT_READ");
checkArgument(
getRowRestriction() == null,
"Invalid BigQueryIO.Read: Specifies row restriction, "
+ "which only applies when using Method.DIRECT_READ");
final BigQuerySourceDef sourceDef = createSourceDef();
final PCollectionView<String> jobIdTokenView;
PCollection<String> jobIdTokenCollection;
PCollection<T> rows;
if (!getWithTemplateCompatibility()) {
// Create a singleton job ID token at construction time.
final String staticJobUuid = BigQueryHelpers.randomUUIDString();
jobIdTokenView =
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.asSingleton());
// Apply the traditional Source model.
rows =
p.apply(
org.apache.beam.sdk.io.Read.from(
sourceDef.toSource(
staticJobUuid, coder, getParseFn(), getUseAvroLogicalTypes())));
} else {
// Create a singleton job ID token at execution time.
jobIdTokenCollection =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return BigQueryHelpers.randomUUIDString();
}
}));
jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
final TupleTag<String> filesTag = new TupleTag<>();
final TupleTag<String> tableSchemaTag = new TupleTag<>();
PCollectionTuple tuple =
jobIdTokenCollection.apply(
"RunCreateJob",
ParDo.of(
new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String jobUuid = c.element();
BigQuerySourceBase<T> source =
sourceDef.toSource(
jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
ExtractResult res = source.extractFiles(options);
LOG.info("Extract job produced {} files", res.extractedFiles.size());
source.cleanupTempResource(options);
for (ResourceId file : res.extractedFiles) {
c.output(file.toString());
}
c.output(tableSchemaTag, BigQueryHelpers.toJsonString(res.schema));
}
})
.withOutputTags(filesTag, TupleTagList.of(tableSchemaTag)));
tuple.get(filesTag).setCoder(StringUtf8Coder.of());
tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
final PCollectionView<String> schemaView =
tuple.get(tableSchemaTag).apply(View.asSingleton());
rows =
tuple
.get(filesTag)
.apply(Reshuffle.viaRandomKey())
.apply(
"ReadFiles",
ParDo.of(
new DoFn<String, T>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableSchema schema =
BigQueryHelpers.fromJsonString(
c.sideInput(schemaView), TableSchema.class);
String jobUuid = c.sideInput(jobIdTokenView);
BigQuerySourceBase<T> source =
sourceDef.toSource(
jobUuid, coder, getParseFn(), getUseAvroLogicalTypes());
List<BoundedSource<T>> sources =
source.createSources(
ImmutableList.of(
FileSystems.matchNewResource(
c.element(), false /* is directory */)),
schema,
null);
checkArgument(sources.size() == 1, "Expected exactly one source.");
BoundedSource<T> avroSource = sources.get(0);
BoundedSource.BoundedReader<T> reader =
avroSource.createReader(c.getPipelineOptions());
for (boolean more = reader.start(); more; more = reader.advance()) {
c.output(reader.getCurrent());
}
}
})
.withSideInputs(schemaView, jobIdTokenView))
.setCoder(coder);
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
PipelineOptions options = c.getPipelineOptions();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
String jobUuid = c.getJobId();
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", jobUuid);
final String executingProject =
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject();
JobReference jobRef =
new JobReference()
.setProjectId(executingProject)
.setJobId(
BigQueryResourceNaming.createJobIdPrefix(
bqOptions.getJobName(), jobUuid, JobType.EXPORT));
Job extractJob = getBigQueryServices().getJobService(bqOptions).getJob(jobRef);
if (extractJob != null) {
List<ResourceId> extractFiles =
getExtractFilePaths(extractDestinationDir, extractJob);
if (extractFiles != null && !extractFiles.isEmpty()) {
FileSystems.delete(
extractFiles, MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
}
}
}
};
rows = rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
if (beamSchemaEnabled) {
BigQueryOptions bqOptions = p.getOptions().as(BigQueryOptions.class);
Schema beamSchema = sourceDef.getBeamSchema(bqOptions);
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);
rows.setSchema(beamSchema, getTypeDescriptor(), toBeamRow, fromBeamRow);
}
return rows;
}
private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) {
ValueProvider<TableReference> tableProvider = getTableProvider();
Pipeline p = input.getPipeline();
if (tableProvider != null) {
// No job ID is required. Read directly from BigQuery storage.
return p.apply(
org.apache.beam.sdk.io.Read.from(
BigQueryStorageTableSource.create(
tableProvider,
getFormat(),
getSelectedFields(),
getRowRestriction(),
getParseFn(),
outputCoder,
getBigQueryServices(),
getProjectionPushdownApplied())));
}
checkArgument(
getSelectedFields() == null,
"Invalid BigQueryIO.Read: Specifies selected fields, "
+ "which only applies when reading from a table");
checkArgument(
getRowRestriction() == null,
"Invalid BigQueryIO.Read: Specifies row restriction, "
+ "which only applies when reading from a table");
//
// N.B. All of the code below exists because the BigQuery storage API can't (yet) read from
// all anonymous tables, so we need the job ID to reason about the name of the destination
// table for the query to read the data and subsequently delete the table and dataset. Once
// the storage API can handle anonymous tables, the storage source should be modified to use
// anonymous tables and all of the code related to job ID generation and table and dataset
// cleanup can be removed. [https://github.com/apache/beam/issues/19375]
//
PCollectionView<String> jobIdTokenView;
PCollection<T> rows;
if (!getWithTemplateCompatibility()) {
// Create a singleton job ID token at pipeline construction time.
String staticJobUuid = BigQueryHelpers.randomUUIDString();
jobIdTokenView =
p.apply("TriggerIdCreation", Create.of(staticJobUuid))
.apply("ViewId", View.asSingleton());
// Apply the traditional Source model.
rows =
p.apply(
org.apache.beam.sdk.io.Read.from(
createStorageQuerySource(staticJobUuid, outputCoder)));
} else {
// Create a singleton job ID token at pipeline execution time.
PCollection<String> jobIdTokenCollection =
p.apply("TriggerIdCreation", Create.of("ignored"))
.apply(
"CreateJobId",
MapElements.via(
new SimpleFunction<String, String>() {
@Override
public String apply(String input) {
return BigQueryHelpers.randomUUIDString();
}
}));
jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton());
TupleTag<ReadStream> readStreamsTag = new TupleTag<>();
TupleTag<ReadSession> readSessionTag = new TupleTag<>();
TupleTag<String> tableSchemaTag = new TupleTag<>();
PCollectionTuple tuple =
jobIdTokenCollection.apply(
"RunQueryJob",
ParDo.of(
new DoFn<String, ReadStream>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
BigQueryOptions options =
c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.element();
// Execute the query and get the destination table holding the results.
// The getTargetTable call runs a new instance of the query and returns
// the destination table created to hold the results.
BigQueryStorageQuerySource<T> querySource =
createStorageQuerySource(jobUuid, outputCoder);
Table queryResultTable = querySource.getTargetTable(options);
// Create a read session without specifying a desired stream count and
// let the BigQuery storage server pick the number of streams.
CreateReadSessionRequest request =
CreateReadSessionRequest.newBuilder()
.setParent(
BigQueryHelpers.toProjectResourceName(
options.getBigQueryProject() == null
? options.getProject()
: options.getBigQueryProject()))
.setReadSession(
ReadSession.newBuilder()
.setTable(
BigQueryHelpers.toTableResourceName(
queryResultTable.getTableReference()))
.setDataFormat(DataFormat.AVRO))
.setMaxStreamCount(0)
.build();
ReadSession readSession;
try (StorageClient storageClient =
getBigQueryServices().getStorageClient(options)) {
readSession = storageClient.createReadSession(request);
}
for (ReadStream readStream : readSession.getStreamsList()) {
c.output(readStream);
}
c.output(readSessionTag, readSession);
c.output(
tableSchemaTag,
BigQueryHelpers.toJsonString(queryResultTable.getSchema()));
}
})
.withOutputTags(
readStreamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag)));
tuple.get(readStreamsTag).setCoder(ProtoCoder.of(ReadStream.class));
tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class));
tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of());
PCollectionView<ReadSession> readSessionView =
tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton());
PCollectionView<String> tableSchemaView =
tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton());
rows =
tuple
.get(readStreamsTag)
.apply(Reshuffle.viaRandomKey())
.apply(
ParDo.of(
new DoFn<ReadStream, T>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
ReadSession readSession = c.sideInput(readSessionView);
TableSchema tableSchema =
BigQueryHelpers.fromJsonString(
c.sideInput(tableSchemaView), TableSchema.class);
ReadStream readStream = c.element();
BigQueryStorageStreamSource<T> streamSource =
BigQueryStorageStreamSource.create(
readSession,
readStream,
tableSchema,
getParseFn(),
outputCoder,
getBigQueryServices());
// Read all of the data from the stream. In the event that this work
// item fails and is rescheduled, the same rows will be returned in
// the same order.
BoundedSource.BoundedReader<T> reader =
streamSource.createReader(c.getPipelineOptions());
for (boolean more = reader.start(); more; more = reader.advance()) {
c.output(reader.getCurrent());
}
}
})
.withSideInputs(readSessionView, tableSchemaView))
.setCoder(outputCoder);
}
PassThroughThenCleanup.CleanupOperation cleanupOperation =
new CleanupOperation() {
@Override
void cleanup(ContextContainer c) throws Exception {
BigQueryOptions options = c.getPipelineOptions().as(BigQueryOptions.class);
String jobUuid = c.getJobId();
Optional<String> queryTempDataset = Optional.ofNullable(getQueryTempDataset());
TableReference tempTable =
createTempTableReference(
options.getBigQueryProject() == null
? options.getProject()
: options.getBigQueryProject(),
BigQueryResourceNaming.createJobIdPrefix(
options.getJobName(), jobUuid, JobType.QUERY),
queryTempDataset);
try (DatasetService datasetService =
getBigQueryServices().getDatasetService(options)) {
LOG.info("Deleting temporary table with query results {}", tempTable);
datasetService.deleteTable(tempTable);
// Delete dataset only if it was created by Beam
boolean datasetCreatedByBeam = !queryTempDataset.isPresent();
if (datasetCreatedByBeam) {
LOG.info(
"Deleting temporary dataset with query results {}", tempTable.getDatasetId());
datasetService.deleteDataset(tempTable.getProjectId(), tempTable.getDatasetId());
}
}
}
};
return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView));
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.addIfNotNull(
DisplayData.item("table", BigQueryHelpers.displayTable(getTableProvider()))
.withLabel("Table"))
.addIfNotNull(DisplayData.item("query", getQuery()).withLabel("Query"))
.addIfNotDefault(
DisplayData.item("projectionPushdownApplied", getProjectionPushdownApplied())
.withLabel("Projection Pushdown Applied"),
false)
.addIfNotNull(
DisplayData.item("flattenResults", getFlattenResults())
.withLabel("Flatten Query Results"))
.addIfNotNull(
DisplayData.item("useLegacySql", getUseLegacySql())
.withLabel("Use Legacy SQL Dialect"))
.addIfNotDefault(
DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true);
ValueProvider<List<String>> selectedFieldsProvider = getSelectedFields();
if (selectedFieldsProvider != null && selectedFieldsProvider.isAccessible()) {
builder.add(
DisplayData.item("selectedFields", String.join(", ", selectedFieldsProvider.get()))
.withLabel("Selected Fields"));
}
}
/** Ensures that methods of the from() / fromQuery() family are called at most once. */
private void ensureFromNotCalledYet() {
checkState(
getJsonTableRef() == null && getQuery() == null, "from() or fromQuery() already called");
}
/** See {@link Read#getTableProvider()}. */
public @Nullable ValueProvider<TableReference> getTableProvider() {
return getJsonTableRef() == null
? null
: NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
/** See {@link Read#getTable()}. */
public @Nullable TableReference getTable() {
ValueProvider<TableReference> provider = getTableProvider();
return provider == null ? null : provider.get();
}
/**
* Sets a {@link Coder} for the result of the parse function. This may be required if a coder
* can not be inferred automatically.
*/
public TypedRead<T> withCoder(Coder<T> coder) {
return toBuilder().setCoder(coder).build();
}
/** For query sources, use this Cloud KMS key to encrypt any temporary tables created. */
public TypedRead<T> withKmsKey(String kmsKey) {
return toBuilder().setKmsKey(kmsKey).build();
}
/**
* Sets the functions to convert elements to/from {@link Row} objects.
*
* <p>Setting these conversion functions is necessary to enable {@link Schema} support.
*/
@Experimental(Kind.SCHEMAS)
public TypedRead<T> withBeamRowConverters(
TypeDescriptor<T> typeDescriptor,
ToBeamRowFunction<T> toRowFn,
FromBeamRowFunction<T> fromRowFn) {
return toBuilder()
.setTypeDescriptor(typeDescriptor)
.setToBeamRowFn(toRowFn)
.setFromBeamRowFn(fromRowFn)
.build();
}
/** See {@link Read#from(String)}. */
public TypedRead<T> from(String tableSpec) {
return from(StaticValueProvider.of(tableSpec));
}
/** See {@link Read#from(ValueProvider)}. */
public TypedRead<T> from(ValueProvider<String> tableSpec) {
ensureFromNotCalledYet();
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
.build();
}
/** See {@link Read#fromQuery(String)}. */
public TypedRead<T> fromQuery(String query) {
return fromQuery(StaticValueProvider.of(query));
}
/** See {@link Read#fromQuery(ValueProvider)}. */
public TypedRead<T> fromQuery(ValueProvider<String> query) {
ensureFromNotCalledYet();
return toBuilder().setQuery(query).setFlattenResults(true).setUseLegacySql(true).build();
}
/** See {@link Read#from(TableReference)}. */
public TypedRead<T> from(TableReference table) {
return from(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
}
/** See {@link Read#withoutValidation()}. */
public TypedRead<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}
/** See {@link Read#withoutResultFlattening()}. */
public TypedRead<T> withoutResultFlattening() {
return toBuilder().setFlattenResults(false).build();
}
/** See {@link Read#usingStandardSql()}. */
public TypedRead<T> usingStandardSql() {
return toBuilder().setUseLegacySql(false).build();
}
/** See {@link QueryPriority}. */
public TypedRead<T> withQueryPriority(QueryPriority priority) {
return toBuilder().setQueryPriority(priority).build();
}
/**
* BigQuery geographic location where the query <a
* href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs">job</a> will be
* executed. If not specified, Beam tries to determine the location by examining the tables
* referenced by the query. Location must be specified for queries not executed in US or EU, or
* when you are reading from an authorized view. See <a
* href="https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/query">BigQuery Jobs:
* query</a>.
*/
public TypedRead<T> withQueryLocation(String location) {
return toBuilder().setQueryLocation(location).build();
}
/**
* Temporary dataset reference when using {@link #fromQuery(String)}. When reading from a query,
* BigQuery will create a temporary dataset and a temporary table to store the results of the
* query. With this option, you can set an existing dataset to create the temporary table.
* BigQueryIO will create a temporary table in that dataset, and will remove it once it is not
* needed. No other tables in the dataset will be modified. If your job does not have
* permissions to create a new dataset, and you want to use {@link #fromQuery(String)} (for
* instance, to read from a view), you should use this option. Remember that the dataset must
* exist and your job needs permissions to create and remove tables inside that dataset.
*/
public TypedRead<T> withQueryTempDataset(String queryTempDatasetRef) {
return toBuilder().setQueryTempDataset(queryTempDatasetRef).build();
}
/** See {@link Method}. */
public TypedRead<T> withMethod(TypedRead.Method method) {
return toBuilder().setMethod(method).build();
}
/** See {@link DataFormat}. */
@Experimental(Experimental.Kind.SOURCE_SINK)
public TypedRead<T> withFormat(DataFormat format) {
return toBuilder().setFormat(format).build();
}
/** See {@link #withSelectedFields(ValueProvider)}. */
public TypedRead<T> withSelectedFields(List<String> selectedFields) {
return withSelectedFields(StaticValueProvider.of(selectedFields));
}
/**
* Read only the specified fields (columns) from a BigQuery table. Fields may not be returned in
* the order specified. If no value is specified, then all fields are returned.
*
* <p>Requires {@link Method#DIRECT_READ}. Not compatible with {@link #fromQuery(String)}.
*/
public TypedRead<T> withSelectedFields(ValueProvider<List<String>> selectedFields) {
return toBuilder().setSelectedFields(selectedFields).build();
}
/** See {@link #withRowRestriction(ValueProvider)}. */
public TypedRead<T> withRowRestriction(String rowRestriction) {
return withRowRestriction(StaticValueProvider.of(rowRestriction));
}
/**
* Read only rows which match the specified filter, which must be a SQL expression compatible
* with <a href="https://cloud.google.com/bigquery/docs/reference/standard-sql/">Google standard
* SQL</a>. If no value is specified, then all rows are returned.
*
* <p>Requires {@link Method#DIRECT_READ}. Not compatible with {@link #fromQuery(String)}.
*/
public TypedRead<T> withRowRestriction(ValueProvider<String> rowRestriction) {
return toBuilder().setRowRestriction(rowRestriction).build();
}
public TypedRead<T> withTemplateCompatibility() {
return toBuilder().setWithTemplateCompatibility(true).build();
}
@VisibleForTesting
public TypedRead<T> withTestServices(BigQueryServices testServices) {
return toBuilder().setBigQueryServices(testServices).build();
}
public TypedRead<T> useAvroLogicalTypes() {
return toBuilder().setUseAvroLogicalTypes(true).build();
}
@VisibleForTesting
TypedRead<T> withProjectionPushdownApplied() {
return toBuilder().setProjectionPushdownApplied(true).build();
}
@Override
public boolean supportsProjectionPushdown() {
// We can't do projection pushdown when a query is set. The query may project certain fields
// itself, and we can't know without parsing the query.
return Method.DIRECT_READ.equals(getMethod()) && getQuery() == null;
}
@Override
public PTransform<PBegin, PCollection<T>> actuateProjectionPushdown(
Map<TupleTag<?>, FieldAccessDescriptor> outputFields) {
Preconditions.checkArgument(supportsProjectionPushdown());
FieldAccessDescriptor fieldAccessDescriptor = outputFields.get(new TupleTag<>("output"));
org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull(
fieldAccessDescriptor, "Expected pushdown on the main output (tagged 'output')");
Preconditions.checkArgument(
outputFields.size() == 1,
"Expected only to pushdown on the main output (tagged 'output'). Requested tags: %s",
outputFields.keySet());
ImmutableList<String> fields =
ImmutableList.copyOf(fieldAccessDescriptor.fieldNamesAccessed());
return withSelectedFields(fields).withProjectionPushdownApplied();
}
}
static String getExtractDestinationUri(String extractDestinationDir) {
return String.format("%s/%s", extractDestinationDir, "*.avro");
}
static List<ResourceId> getExtractFilePaths(String extractDestinationDir, Job extractJob)
throws IOException {
JobStatistics jobStats = extractJob.getStatistics();
List<Long> counts = jobStats.getExtract().getDestinationUriFileCounts();
if (counts.size() != 1) {
String errorMessage =
counts.isEmpty()
? "No destination uri file count received."
: String.format(
"More than one destination uri file count received. First two are %s, %s",
counts.get(0), counts.get(1));
throw new RuntimeException(errorMessage);
}
long filesCount = counts.get(0);
ImmutableList.Builder<ResourceId> paths = ImmutableList.builder();
ResourceId extractDestinationDirResourceId =
FileSystems.matchNewResource(extractDestinationDir, true /* isDirectory */);
for (long i = 0; i < filesCount; ++i) {
ResourceId filePath =
extractDestinationDirResourceId.resolve(
String.format("%012d%s", i, ".avro"),
ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
paths.add(filePath);
}
return paths.build();
}
/////////////////////////////////////////////////////////////////////////////
/**
* A {@link PTransform} that writes a {@link PCollection} to a BigQuery table. A formatting
* function must be provided to convert each input element into a {@link TableRow} using {@link
* Write#withFormatFunction(SerializableFunction)}.
*
* <p>In BigQuery, each table has an enclosing dataset. The dataset being written must already
* exist.
*
* <p>By default, tables will be created if they do not exist, which corresponds to a {@link
* Write.CreateDisposition#CREATE_IF_NEEDED} disposition that matches the default of BigQuery's
* Jobs API. A schema must be provided (via {@link Write#withSchema(TableSchema)}), or else the
* transform may fail at runtime with an {@link IllegalArgumentException}.
*
* <p>By default, writes require an empty table, which corresponds to a {@link
* Write.WriteDisposition#WRITE_EMPTY} disposition that matches the default of BigQuery's Jobs
* API.
*
* <p>Here is a sample transform that produces TableRow values containing "word" and "count"
* columns:
*
* <pre>{@code
* static class FormatCountsFn extends DoFn<KV<String, Long>, TableRow> {
* public void processElement(ProcessContext c) {
* TableRow row = new TableRow()
* .set("word", c.element().getKey())
* .set("count", c.element().getValue().intValue());
* c.output(row);
* }
* }
* }</pre>
*/
public static <T> Write<T> write() {
return new AutoValue_BigQueryIO_Write.Builder<T>()
.setValidate(true)
.setBigQueryServices(new BigQueryServicesImpl())
.setCreateDisposition(Write.CreateDisposition.CREATE_IF_NEEDED)
.setWriteDisposition(Write.WriteDisposition.WRITE_EMPTY)
.setSchemaUpdateOptions(Collections.emptySet())
.setNumFileShards(0)
.setNumStorageWriteApiStreams(0)
.setMethod(Write.Method.DEFAULT)
.setExtendedErrorInfo(false)
.setSkipInvalidRows(false)
.setIgnoreUnknownValues(false)
.setIgnoreInsertIds(false)
.setUseAvroLogicalTypes(false)
.setMaxFilesPerPartition(BatchLoads.DEFAULT_MAX_FILES_PER_PARTITION)
.setMaxBytesPerPartition(BatchLoads.DEFAULT_MAX_BYTES_PER_PARTITION)
.setOptimizeWrites(false)
.setUseBeamSchema(false)
.setAutoSharding(false)
.setPropagateSuccessful(true)
.setDeterministicRecordIdFn(null)
.build();
}
/**
* A {@link PTransform} that writes a {@link PCollection} containing {@link TableRow TableRows} to
* a BigQuery table.
*
* <p>It is recommended to instead use {@link #write} with {@link
* Write#withFormatFunction(SerializableFunction)}.
*/
public static Write<TableRow> writeTableRows() {
return BigQueryIO.<TableRow>write().withFormatFunction(IDENTITY_FORMATTER);
}
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, WriteResult> {
/** Determines the method used to insert data in BigQuery. */
public enum Method {
/**
* The default behavior if no method is explicitly set. If the input is bounded, then file
* loads will be used. If the input is unbounded, then streaming inserts will be used.
*/
DEFAULT,
/**
* Use BigQuery load jobs to insert data. Records will first be written to files, and these
* files will be loaded into BigQuery. This is the default method when the input is bounded.
* This method can be chosen for unbounded inputs as well, as long as a triggering frequency
* is also set using {@link #withTriggeringFrequency}. BigQuery has daily quotas on the number
* of load jobs allowed per day, so be careful not to set the triggering frequency too
* frequent. For more information, see <a
* href="https://cloud.google.com/bigquery/docs/loading-data-cloud-storage">Loading Data from
* Cloud Storage</a>. Note: Load jobs currently do not support BigQuery's <a
* href="https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type">
* JSON data type</a>.
*/
FILE_LOADS,
/**
* Use the BigQuery streaming insert API to insert data. This provides the lowest-latency
* insert path into BigQuery, and therefore is the default method when the input is unbounded.
* BigQuery will make a strong effort to ensure no duplicates when using this path, however
* there are some scenarios in which BigQuery is unable to make this guarantee (see
* https://cloud.google.com/bigquery/streaming-data-into-bigquery). A query can be run over
* the output table to periodically clean these rare duplicates. Alternatively, using the
* {@link #FILE_LOADS} insert method does guarantee no duplicates, though the latency for the
* insert into BigQuery will be much higher. For more information, see <a
* href="https://cloud.google.com/bigquery/streaming-data-into-bigquery">Streaming Data into
* BigQuery</a>.
*/
STREAMING_INSERTS,
/** Use the new, exactly-once Storage Write API. */
STORAGE_WRITE_API,
/**
* Use the new, Storage Write API without exactly once enabled. This will be cheaper and
* provide lower latency, however comes with the caveat that the output table may contain
* duplicates.
*/
STORAGE_API_AT_LEAST_ONCE
}
abstract @Nullable ValueProvider<String> getJsonTableRef();
abstract @Nullable SerializableFunction<ValueInSingleWindow<T>, TableDestination>
getTableFunction();
abstract @Nullable SerializableFunction<T, TableRow> getFormatFunction();
abstract @Nullable SerializableFunction<T, TableRow> getFormatRecordOnFailureFunction();
abstract RowWriterFactory.@Nullable AvroRowWriterFactory<T, ?, ?> getAvroRowWriterFactory();
abstract @Nullable SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema>
getAvroSchemaFactory();
abstract boolean getUseAvroLogicalTypes();
abstract @Nullable DynamicDestinations<T, ?> getDynamicDestinations();
abstract @Nullable PCollectionView<Map<String, String>> getSchemaFromView();
abstract @Nullable ValueProvider<String> getJsonSchema();
abstract @Nullable ValueProvider<String> getJsonTimePartitioning();
abstract @Nullable Clustering getClustering();
abstract CreateDisposition getCreateDisposition();
abstract WriteDisposition getWriteDisposition();
abstract Set<SchemaUpdateOption> getSchemaUpdateOptions();
/** Table description. Default is empty. */
abstract @Nullable String getTableDescription();
/** An option to indicate if table validation is desired. Default is true. */
abstract boolean getValidate();
abstract BigQueryServices getBigQueryServices();
abstract @Nullable Integer getMaxFilesPerBundle();
abstract @Nullable Long getMaxFileSize();
abstract int getNumFileShards();
abstract int getNumStorageWriteApiStreams();
abstract int getMaxFilesPerPartition();
abstract long getMaxBytesPerPartition();
abstract @Nullable Duration getTriggeringFrequency();
abstract Write.Method getMethod();
abstract @Nullable ValueProvider<String> getLoadJobProjectId();
abstract @Nullable InsertRetryPolicy getFailedInsertRetryPolicy();
abstract @Nullable ValueProvider<String> getCustomGcsTempLocation();
abstract boolean getExtendedErrorInfo();
abstract Boolean getSkipInvalidRows();
abstract Boolean getIgnoreUnknownValues();
abstract Boolean getIgnoreInsertIds();
abstract @Nullable String getKmsKey();
abstract Boolean getOptimizeWrites();
@Experimental(Kind.SCHEMAS)
abstract Boolean getUseBeamSchema();
@Experimental
abstract Boolean getAutoSharding();
abstract Boolean getPropagateSuccessful();
@Experimental
abstract @Nullable SerializableFunction<T, String> getDeterministicRecordIdFn();
abstract @Nullable String getWriteTempDataset();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setJsonTableRef(ValueProvider<String> jsonTableRef);
abstract Builder<T> setTableFunction(
SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction);
abstract Builder<T> setFormatFunction(SerializableFunction<T, TableRow> formatFunction);
abstract Builder<T> setFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction);
abstract Builder<T> setAvroRowWriterFactory(
RowWriterFactory.AvroRowWriterFactory<T, ?, ?> avroRowWriterFactory);
abstract Builder<T> setAvroSchemaFactory(
SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory);
abstract Builder<T> setUseAvroLogicalTypes(boolean useAvroLogicalTypes);
abstract Builder<T> setDynamicDestinations(DynamicDestinations<T, ?> dynamicDestinations);
abstract Builder<T> setSchemaFromView(PCollectionView<Map<String, String>> view);
abstract Builder<T> setJsonSchema(ValueProvider<String> jsonSchema);
abstract Builder<T> setJsonTimePartitioning(ValueProvider<String> jsonTimePartitioning);
abstract Builder<T> setClustering(Clustering clustering);
abstract Builder<T> setCreateDisposition(CreateDisposition createDisposition);
abstract Builder<T> setWriteDisposition(WriteDisposition writeDisposition);
abstract Builder<T> setSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions);
abstract Builder<T> setTableDescription(String tableDescription);
abstract Builder<T> setValidate(boolean validate);
abstract Builder<T> setBigQueryServices(BigQueryServices bigQueryServices);
abstract Builder<T> setMaxFilesPerBundle(Integer maxFilesPerBundle);
abstract Builder<T> setMaxFileSize(Long maxFileSize);
abstract Builder<T> setNumFileShards(int numFileShards);
abstract Builder<T> setNumStorageWriteApiStreams(int numStorageApiStreams);
abstract Builder<T> setMaxFilesPerPartition(int maxFilesPerPartition);
abstract Builder<T> setMaxBytesPerPartition(long maxBytesPerPartition);
abstract Builder<T> setTriggeringFrequency(Duration triggeringFrequency);
abstract Builder<T> setMethod(Write.Method method);
abstract Builder<T> setLoadJobProjectId(ValueProvider<String> loadJobProjectId);
abstract Builder<T> setFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy);
abstract Builder<T> setCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation);
abstract Builder<T> setExtendedErrorInfo(boolean extendedErrorInfo);
abstract Builder<T> setSkipInvalidRows(Boolean skipInvalidRows);
abstract Builder<T> setIgnoreUnknownValues(Boolean ignoreUnknownValues);
abstract Builder<T> setIgnoreInsertIds(Boolean ignoreInsertIds);
abstract Builder<T> setKmsKey(String kmsKey);
abstract Builder<T> setOptimizeWrites(Boolean optimizeWrites);
@Experimental(Kind.SCHEMAS)
abstract Builder<T> setUseBeamSchema(Boolean useBeamSchema);
@Experimental
abstract Builder<T> setAutoSharding(Boolean autoSharding);
abstract Builder<T> setPropagateSuccessful(Boolean propagateSuccessful);
@Experimental
abstract Builder<T> setDeterministicRecordIdFn(
SerializableFunction<T, String> toUniqueIdFunction);
abstract Builder<T> setWriteTempDataset(String writeTempDataset);
abstract Write<T> build();
}
/**
* An enumeration type for the BigQuery create disposition strings.
*
* @see <a
* href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.createDisposition">
* <code>configuration.query.createDisposition</code> in the BigQuery Jobs API</a>
*/
public enum CreateDisposition {
/**
* Specifics that tables should not be created.
*
* <p>If the output table does not exist, the write fails.
*/
CREATE_NEVER,
/**
* Specifies that tables should be created if needed. This is the default behavior.
*
* <p>Requires that a table schema is provided via {@link BigQueryIO.Write#withSchema}. This
* precondition is checked before starting a job. The schema is not required to match an
* existing table's schema.
*
* <p>When this transformation is executed, if the output table does not exist, the table is
* created from the provided schema. Note that even if the table exists, it may be recreated
* if necessary when paired with a {@link WriteDisposition#WRITE_TRUNCATE}.
*/
CREATE_IF_NEEDED
}
/**
* An enumeration type for the BigQuery write disposition strings.
*
* @see <a
* href="https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.query.writeDisposition">
* <code>configuration.query.writeDisposition</code> in the BigQuery Jobs API</a>
*/
public enum WriteDisposition {
/**
* Specifies that write should replace a table.
*
* <p>The replacement may occur in multiple steps - for instance by first removing the
* existing table, then creating a replacement, then filling it in. This is not an atomic
* operation, and external programs may see the table in any of these intermediate steps.
*/
WRITE_TRUNCATE,
/** Specifies that rows may be appended to an existing table. */
WRITE_APPEND,
/**
* Specifies that the output table must be empty. This is the default behavior.
*
* <p>If the output table is not empty, the write fails at runtime.
*
* <p>This check may occur long before data is written, and does not guarantee exclusive
* access to the table. If two programs are run concurrently, each specifying the same output
* table and a {@link WriteDisposition} of {@link WriteDisposition#WRITE_EMPTY}, it is
* possible for both to succeed.
*/
WRITE_EMPTY
}
/**
* An enumeration type for the BigQuery schema update options strings.
*
* <p>Not supported for {@link Method#STREAMING_INSERTS}.
*
* <p>Note from the BigQuery API doc -- Schema update options are supported in two cases: when
* writeDisposition is WRITE_APPEND; when writeDisposition is WRITE_TRUNCATE and the destination
* table is a partition of a table, specified by partition decorators.
*
* @see <a
* href="https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#jobconfigurationquery">
* <code>configuration.query.schemaUpdateOptions</code> in the BigQuery Jobs API</a>
*/
public enum SchemaUpdateOption {
/** Allow adding a nullable field to the schema. */
ALLOW_FIELD_ADDITION,
/** Allow relaxing a required field in the original schema to nullable. */
ALLOW_FIELD_RELAXATION
}
/**
* Writes to the given table, specified in the format described in {@link
* BigQueryHelpers#parseTableSpec}.
*/
public Write<T> to(String tableSpec) {
return to(StaticValueProvider.of(tableSpec));
}
/** Writes to the given table, specified as a {@link TableReference}. */
public Write<T> to(TableReference table) {
return to(StaticValueProvider.of(BigQueryHelpers.toTableSpec(table)));
}
/** Same as {@link #to(String)}, but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> tableSpec) {
checkArgument(tableSpec != null, "tableSpec can not be null");
return toBuilder()
.setJsonTableRef(
NestedValueProvider.of(
NestedValueProvider.of(tableSpec, new TableSpecToTableRef()),
new TableRefToJson()))
.build();
}
/**
* Writes to table specified by the specified table function. The table is a function of {@link
* ValueInSingleWindow}, so can be determined by the value or by the window.
*
* <p>If the function produces destinations configured with clustering fields, ensure that
* {@link #withClustering()} is also set so that the clustering configurations get properly
* encoded and decoded.
*/
public Write<T> to(
SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction) {
checkArgument(tableFunction != null, "tableFunction can not be null");
return toBuilder().setTableFunction(tableFunction).build();
}
/**
* Writes to the table and schema specified by the {@link DynamicDestinations} object.
*
* <p>If any of the returned destinations are configured with clustering fields, ensure that the
* passed {@link DynamicDestinations} object returns {@link TableDestinationCoderV3} when {@link
* DynamicDestinations#getDestinationCoder()} is called.
*/
public Write<T> to(DynamicDestinations<T, ?> dynamicDestinations) {
checkArgument(dynamicDestinations != null, "dynamicDestinations can not be null");
return toBuilder().setDynamicDestinations(dynamicDestinations).build();
}
/** Formats the user's type into a {@link TableRow} to be written to BigQuery. */
public Write<T> withFormatFunction(SerializableFunction<T, TableRow> formatFunction) {
return toBuilder().setFormatFunction(formatFunction).build();
}
/**
* If an insert failure occurs, this function is applied to the originally supplied row T. The
* resulting {@link TableRow} will be accessed via {@link
* WriteResult#getFailedInsertsWithErr()}.
*/
public Write<T> withFormatRecordOnFailureFunction(
SerializableFunction<T, TableRow> formatFunction) {
return toBuilder().setFormatRecordOnFailureFunction(formatFunction).build();
}
/**
* Formats the user's type into a {@link GenericRecord} to be written to BigQuery. The
* GenericRecords are written as avro using the standard {@link GenericDatumWriter}.
*
* <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
*/
public Write<T> withAvroFormatFunction(
SerializableFunction<AvroWriteRequest<T>, GenericRecord> avroFormatFunction) {
return withAvroWriter(avroFormatFunction, GENERIC_DATUM_WRITER_FACTORY);
}
/**
* Writes the user's type as avro using the supplied {@link DatumWriter}.
*
* <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
*
* <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
*/
public Write<T> withAvroWriter(
SerializableFunction<org.apache.avro.Schema, DatumWriter<T>> writerFactory) {
return withAvroWriter(AvroWriteRequest::getElement, writerFactory);
}
/**
* Convert's the user's type to an avro record using the supplied avroFormatFunction. Records
* are then written using the supplied writer instances returned from writerFactory.
*
* <p>This is mutually exclusive with {@link #withFormatFunction}, only one may be set.
*
* <p>Overwrites {@link #withAvroFormatFunction} if it has been set.
*/
public <AvroT> Write<T> withAvroWriter(
SerializableFunction<AvroWriteRequest<T>, AvroT> avroFormatFunction,
SerializableFunction<org.apache.avro.Schema, DatumWriter<AvroT>> writerFactory) {
return toBuilder()
.setOptimizeWrites(true)
.setAvroRowWriterFactory(RowWriterFactory.avroRecords(avroFormatFunction, writerFactory))
.build();
}
/**
* Uses the specified function to convert a {@link TableSchema} to a {@link
* org.apache.avro.Schema}.
*
* <p>If not specified, the TableSchema will automatically be converted to an avro schema.
*/
public Write<T> withAvroSchemaFactory(
SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory) {
return toBuilder().setAvroSchemaFactory(avroSchemaFactory).build();
}
/**
* Uses the specified schema for rows to be written.
*
* <p>The schema is <i>required</i> only if writing to a table that does not already exist, and
* {@link CreateDisposition} is set to {@link CreateDisposition#CREATE_IF_NEEDED}.
*/
public Write<T> withSchema(TableSchema schema) {
checkArgument(schema != null, "schema can not be null");
return withJsonSchema(StaticValueProvider.of(BigQueryHelpers.toJsonString(schema)));
}
/** Same as {@link #withSchema(TableSchema)} but using a deferred {@link ValueProvider}. */
public Write<T> withSchema(ValueProvider<TableSchema> schema) {
checkArgument(schema != null, "schema can not be null");
return withJsonSchema(NestedValueProvider.of(schema, new TableSchemaToJsonSchema()));
}
/**
* Similar to {@link #withSchema(TableSchema)} but takes in a JSON-serialized {@link
* TableSchema}.
*/
public Write<T> withJsonSchema(String jsonSchema) {
checkArgument(jsonSchema != null, "jsonSchema can not be null");
return withJsonSchema(StaticValueProvider.of(jsonSchema));
}
/** Same as {@link #withJsonSchema(String)} but using a deferred {@link ValueProvider}. */
public Write<T> withJsonSchema(ValueProvider<String> jsonSchema) {
checkArgument(jsonSchema != null, "jsonSchema can not be null");
return toBuilder().setJsonSchema(jsonSchema).build();
}
/**
* Allows the schemas for each table to be computed within the pipeline itself.
*
* <p>The input is a map-valued {@link PCollectionView} mapping string tablespecs to
* JSON-formatted {@link TableSchema}s. Tablespecs must be in the same format as taken by {@link
* #to(String)}.
*/
public Write<T> withSchemaFromView(PCollectionView<Map<String, String>> view) {
checkArgument(view != null, "view can not be null");
return toBuilder().setSchemaFromView(view).build();
}
/**
* Allows newly created tables to include a {@link TimePartitioning} class. Can only be used
* when writing to a single table. If {@link #to(SerializableFunction)} or {@link
* #to(DynamicDestinations)} is used to write dynamic tables, time partitioning can be directly
* set in the returned {@link TableDestination}.
*/
public Write<T> withTimePartitioning(TimePartitioning partitioning) {
checkArgument(partitioning != null, "partitioning can not be null");
return withJsonTimePartitioning(
StaticValueProvider.of(BigQueryHelpers.toJsonString(partitioning)));
}
/**
* Like {@link #withTimePartitioning(TimePartitioning)} but using a deferred {@link
* ValueProvider}.
*/
public Write<T> withTimePartitioning(ValueProvider<TimePartitioning> partitioning) {
checkArgument(partitioning != null, "partitioning can not be null");
return withJsonTimePartitioning(
NestedValueProvider.of(partitioning, new TimePartitioningToJson()));
}
/** The same as {@link #withTimePartitioning}, but takes a JSON-serialized object. */
public Write<T> withJsonTimePartitioning(ValueProvider<String> partitioning) {
checkArgument(partitioning != null, "partitioning can not be null");
return toBuilder().setJsonTimePartitioning(partitioning).build();
}
/**
* Specifies the clustering fields to use when writing to a single output table. Can only be
* used when {@link#withTimePartitioning(TimePartitioning)} is set. If {@link
* #to(SerializableFunction)} or {@link #to(DynamicDestinations)} is used to write to dynamic
* tables, the fields here will be ignored; call {@link #withClustering()} instead.
*/
public Write<T> withClustering(Clustering clustering) {
checkArgument(clustering != null, "clustering can not be null");
return toBuilder().setClustering(clustering).build();
}
/**
* Allows writing to clustered tables when {@link #to(SerializableFunction)} or {@link
* #to(DynamicDestinations)} is used. The returned {@link TableDestination} objects should
* specify the clustering fields per table. If writing to a single table, use {@link
* #withClustering(Clustering)} instead to pass a {@link Clustering} instance that specifies the
* static clustering fields to use.
*
* <p>Setting this option enables use of {@link TableDestinationCoderV3} which encodes
* clustering information. The updated coder is compatible with non-clustered tables, so can be
* freely set for newly deployed pipelines, but note that pipelines using an older coder must be
* drained before setting this option, since {@link TableDestinationCoderV3} will not be able to
* read state written with a previous version.
*/
public Write<T> withClustering() {
return toBuilder().setClustering(new Clustering()).build();
}
/** Specifies whether the table should be created if it does not exist. */
public Write<T> withCreateDisposition(CreateDisposition createDisposition) {
checkArgument(createDisposition != null, "createDisposition can not be null");
return toBuilder().setCreateDisposition(createDisposition).build();
}
/** Specifies what to do with existing data in the table, in case the table already exists. */
public Write<T> withWriteDisposition(WriteDisposition writeDisposition) {
checkArgument(writeDisposition != null, "writeDisposition can not be null");
return toBuilder().setWriteDisposition(writeDisposition).build();
}
/**
* Allows the schema of the destination table to be updated as a side effect of the write.
*
* <p>This configuration applies only when writing to BigQuery with {@link Method#FILE_LOADS} as
* method.
*/
public Write<T> withSchemaUpdateOptions(Set<SchemaUpdateOption> schemaUpdateOptions) {
checkArgument(schemaUpdateOptions != null, "schemaUpdateOptions can not be null");
return toBuilder().setSchemaUpdateOptions(schemaUpdateOptions).build();
}
/** Specifies the table description. */
public Write<T> withTableDescription(String tableDescription) {
checkArgument(tableDescription != null, "tableDescription can not be null");
return toBuilder().setTableDescription(tableDescription).build();
}
/**
* Specfies a policy for handling failed inserts.
*
* <p>Currently this only is allowed when writing an unbounded collection to BigQuery. Bounded
* collections are written using batch load jobs, so we don't get per-element failures.
* Unbounded collections are written using streaming inserts, so we have access to per-element
* insert results.
*/
public Write<T> withFailedInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
checkArgument(retryPolicy != null, "retryPolicy can not be null");
return toBuilder().setFailedInsertRetryPolicy(retryPolicy).build();
}
/** Disables BigQuery table validation. */
public Write<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}
/**
* Choose the method used to write data to BigQuery. See the Javadoc on {@link Method} for
* information and restrictions of the different methods.
*/
public Write<T> withMethod(Write.Method method) {
checkArgument(method != null, "method can not be null");
return toBuilder().setMethod(method).build();
}
/**
* Set the project the BigQuery load job will be initiated from. This is only applicable when
* the write method is set to {@link Method#FILE_LOADS}. If omitted, the project of the
* destination table is used.
*/
public Write<T> withLoadJobProjectId(String loadJobProjectId) {
return withLoadJobProjectId(StaticValueProvider.of(loadJobProjectId));
}
public Write<T> withLoadJobProjectId(ValueProvider<String> loadJobProjectId) {
checkArgument(loadJobProjectId != null, "loadJobProjectId can not be null");
return toBuilder().setLoadJobProjectId(loadJobProjectId).build();
}
/**
* Choose the frequency at which file writes are triggered.
*
* <p>This is only applicable when the write method is set to {@link Method#FILE_LOADS}, and
* only when writing an unbounded {@link PCollection}.
*
* <p>Every triggeringFrequency duration, a BigQuery load job will be generated for all the data
* written since the last load job. BigQuery has limits on how many load jobs can be triggered
* per day, so be careful not to set this duration too low, or you may exceed daily quota. Often
* this is set to 5 or 10 minutes to ensure that the project stays well under the BigQuery
* quota. See <a href="https://cloud.google.com/bigquery/quota-policy">Quota Policy</a> for more
* information about BigQuery quotas.
*/
public Write<T> withTriggeringFrequency(Duration triggeringFrequency) {
checkArgument(triggeringFrequency != null, "triggeringFrequency can not be null");
return toBuilder().setTriggeringFrequency(triggeringFrequency).build();
}
/**
* Control how many file shards are written when using BigQuery load jobs. Applicable only when
* also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at
* runtime, set {@link #withAutoSharding()} instead.
*/
public Write<T> withNumFileShards(int numFileShards) {
checkArgument(numFileShards > 0, "numFileShards must be > 0, but was: %s", numFileShards);
return toBuilder().setNumFileShards(numFileShards).build();
}
/**
* Control how many parallel streams are used when using Storage API writes. Applicable only
* when also setting {@link #withTriggeringFrequency}. To let runner determine the sharding at
* runtime, set {@link #withAutoSharding()} instead.
*/
public Write<T> withNumStorageWriteApiStreams(int numStorageWriteApiStreams) {
checkArgument(
numStorageWriteApiStreams > 0,
"numStorageWriteApiStreams must be > 0, but was: %s",
numStorageWriteApiStreams);
return toBuilder().setNumStorageWriteApiStreams(numStorageWriteApiStreams).build();
}
/**
* Provides a custom location on GCS for storing temporary files to be loaded via BigQuery batch
* load jobs. See "Usage with templates" in {@link BigQueryIO} documentation for discussion.
*/
public Write<T> withCustomGcsTempLocation(ValueProvider<String> customGcsTempLocation) {
checkArgument(customGcsTempLocation != null, "customGcsTempLocation can not be null");
return toBuilder().setCustomGcsTempLocation(customGcsTempLocation).build();
}
/**
* Enables extended error information by enabling {@link WriteResult#getFailedInsertsWithErr()}
*
* <p>ATM this only works if using {@link Method#STREAMING_INSERTS}. See {@link
* Write#withMethod(Method)}.
*/
public Write<T> withExtendedErrorInfo() {
return toBuilder().setExtendedErrorInfo(true).build();
}
/**
* Insert all valid rows of a request, even if invalid rows exist. This is only applicable when
* the write method is set to {@link Method#STREAMING_INSERTS}. The default value is false,
* which causes the entire request to fail if any invalid rows exist.
*/
public Write<T> skipInvalidRows() {
return toBuilder().setSkipInvalidRows(true).build();
}
/**
* Accept rows that contain values that do not match the schema. The unknown values are ignored.
* Default is false, which treats unknown values as errors.
*/
public Write<T> ignoreUnknownValues() {
return toBuilder().setIgnoreUnknownValues(true).build();
}
/**
* Enables interpreting logical types into their corresponding types (ie. TIMESTAMP), instead of
* only using their raw types (ie. LONG).
*/
public Write<T> useAvroLogicalTypes() {
return toBuilder().setUseAvroLogicalTypes(true).build();
}
/**
* Setting this option to true disables insertId based data deduplication offered by BigQuery.
* For more information, please see
* https://cloud.google.com/bigquery/streaming-data-into-bigquery#disabling_best_effort_de-duplication.
*/
public Write<T> ignoreInsertIds() {
return toBuilder().setIgnoreInsertIds(true).build();
}
public Write<T> withKmsKey(String kmsKey) {
return toBuilder().setKmsKey(kmsKey).build();
}
/**
* If true, enables new codepaths that are expected to use less resources while writing to
* BigQuery. Not enabled by default in order to maintain backwards compatibility.
*/
public Write<T> optimizedWrites() {
return toBuilder().setOptimizeWrites(true).build();
}
/**
* If true, then the BigQuery schema will be inferred from the input schema. If no
* formatFunction is set, then BigQueryIO will automatically turn the input records into
* TableRows that match the schema.
*/
@Experimental(Kind.SCHEMAS)
public Write<T> useBeamSchema() {
return toBuilder().setUseBeamSchema(true).build();
}
/**
* If true, enables using a dynamically determined number of shards to write to BigQuery. This
* can be used for both {@link Method#FILE_LOADS} and {@link Method#STREAMING_INSERTS}. Only
* applicable to unbounded data. If using {@link Method#FILE_LOADS}, numFileShards set via
* {@link #withNumFileShards} will be ignored.
*/
@Experimental
public Write<T> withAutoSharding() {
return toBuilder().setAutoSharding(true).build();
}
/**
* If true, it enables the propagation of the successfully inserted TableRows on BigQuery as
* part of the {@link WriteResult} object when using {@link Method#STREAMING_INSERTS}. By
* default this property is set on true. In the cases where a pipeline won't make use of the
* insert results this property can be set on false, which will make the pipeline let go of
* those inserted TableRows and reclaim worker resources.
*/
public Write<T> withSuccessfulInsertsPropagation(boolean propagateSuccessful) {
return toBuilder().setPropagateSuccessful(propagateSuccessful).build();
}
/**
* Provides a function which can serve as a source of deterministic unique ids for each record
* to be written, replacing the unique ids generated with the default scheme. When used with
* {@link Method#STREAMING_INSERTS} This also elides the re-shuffle from the BigQueryIO Write by
* using the keys on which the data is grouped at the point at which BigQueryIO Write is
* applied, since the reshuffle is necessary only for the checkpointing of the default-generated
* ids for determinism. This may be beneficial as a performance optimization in the case when
* the current sharding is already sufficient for writing to BigQuery. This behavior takes
* precedence over {@link #withAutoSharding}.
*/
@Experimental
public Write<T> withDeterministicRecordIdFn(
SerializableFunction<T, String> toUniqueIdFunction) {
return toBuilder().setDeterministicRecordIdFn(toUniqueIdFunction).build();
}
@VisibleForTesting
/** This method is for test usage only */
public Write<T> withTestServices(BigQueryServices testServices) {
checkArgument(testServices != null, "testServices can not be null");
return toBuilder().setBigQueryServices(testServices).build();
}
/**
* Control how many files will be written concurrently by a single worker when using BigQuery
* load jobs before spilling to a shuffle. When data comes into this transform, it is written to
* one file per destination per worker. When there are more files than maxFilesPerBundle
* (DEFAULT: 20), the data is shuffled (i.e. Grouped By Destination), and written to files
* one-by-one-per-worker. This flag sets the maximum number of files that a single worker can
* write concurrently before shuffling the data. This flag should be used with caution. Setting
* a high number can increase the memory pressure on workers, and setting a low number can make
* a pipeline slower (due to the need to shuffle data).
*/
public Write<T> withMaxFilesPerBundle(int maxFilesPerBundle) {
checkArgument(
maxFilesPerBundle > 0, "maxFilesPerBundle must be > 0, but was: %s", maxFilesPerBundle);
return toBuilder().setMaxFilesPerBundle(maxFilesPerBundle).build();
}
@VisibleForTesting
Write<T> withMaxFileSize(long maxFileSize) {
checkArgument(maxFileSize > 0, "maxFileSize must be > 0, but was: %s", maxFileSize);
return toBuilder().setMaxFileSize(maxFileSize).build();
}
@VisibleForTesting
Write<T> withMaxFilesPerPartition(int maxFilesPerPartition) {
checkArgument(
maxFilesPerPartition > 0,
"maxFilesPerPartition must be > 0, but was: %s",
maxFilesPerPartition);
return toBuilder().setMaxFilesPerPartition(maxFilesPerPartition).build();
}
/**
* Control how much data will be assigned to a single BigQuery load job. If the amount of data
* flowing into one {@code BatchLoads} partition exceeds this value, that partition will be
* handled via multiple load jobs.
*
* <p>The default value (11 TiB) respects BigQuery's maximum size per load job limit and is
* appropriate for most use cases. Reducing the value of this parameter can improve stability
* when loading to tables with complex schemas containing thousands of fields.
*
* @see <a href="https://cloud.google.com/bigquery/quotas#load_jobs">BigQuery Load Job
* Limits</a>
*/
public Write<T> withMaxBytesPerPartition(long maxBytesPerPartition) {
checkArgument(
maxBytesPerPartition > 0,
"maxBytesPerPartition must be > 0, but was: %s",
maxBytesPerPartition);
return toBuilder().setMaxBytesPerPartition(maxBytesPerPartition).build();
}
/**
* Temporary dataset. When writing to BigQuery from large file loads, the {@link
* BigQueryIO#write()} will create temporary tables in a dataset to store staging data from
* partitions. With this option, you can set an existing dataset to create the temporary tables.
* BigQueryIO will create temporary tables in that dataset, and will remove it once it is not
* needed. No other tables in the dataset will be modified. Remember that the dataset must exist
* and your job needs permissions to create and remove tables inside that dataset.
*/
public Write<T> withWriteTempDataset(String writeTempDataset) {
return toBuilder().setWriteTempDataset(writeTempDataset).build();
}
@Override
public void validate(PipelineOptions pipelineOptions) {
BigQueryOptions options = pipelineOptions.as(BigQueryOptions.class);
// The user specified a table.
if (getJsonTableRef() != null && getJsonTableRef().isAccessible() && getValidate()) {
TableReference table = getTableWithDefaultProject(options).get();
try (DatasetService datasetService = getBigQueryServices().getDatasetService(options)) {
// Check for destination table presence and emptiness for early failure notification.
// Note that a presence check can fail when the table or dataset is created by an earlier
// stage of the pipeline. For these cases the #withoutValidation method can be used to
// disable the check.
BigQueryHelpers.verifyDatasetPresence(datasetService, table);
if (getCreateDisposition() == BigQueryIO.Write.CreateDisposition.CREATE_NEVER) {
BigQueryHelpers.verifyTablePresence(datasetService, table);
}
if (getWriteDisposition() == BigQueryIO.Write.WriteDisposition.WRITE_EMPTY) {
BigQueryHelpers.verifyTableNotExistOrEmpty(datasetService, table);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
private Write.Method resolveMethod(PCollection<T> input) {
if (getMethod() != Write.Method.DEFAULT) {
return getMethod();
}
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
if (bqOptions.getUseStorageWriteApi()) {
return bqOptions.getUseStorageWriteApiAtLeastOnce()
? Method.STORAGE_API_AT_LEAST_ONCE
: Method.STORAGE_WRITE_API;
}
// By default, when writing an Unbounded PCollection, we use StreamingInserts and
// BigQuery's streaming import API.
return (input.isBounded() == IsBounded.UNBOUNDED)
? Write.Method.STREAMING_INSERTS
: Write.Method.FILE_LOADS;
}
private Duration getStorageApiTriggeringFrequency(BigQueryOptions options) {
if (getTriggeringFrequency() != null) {
return getTriggeringFrequency();
}
if (options.getStorageWriteApiTriggeringFrequencySec() != null) {
return Duration.standardSeconds(options.getStorageWriteApiTriggeringFrequencySec());
}
return null;
}
private int getStorageApiNumStreams(BigQueryOptions options) {
if (getNumStorageWriteApiStreams() != 0) {
return getNumStorageWriteApiStreams();
}
return options.getNumStorageWriteApiStreams();
}
@Override
public WriteResult expand(PCollection<T> input) {
// We must have a destination to write to!
checkArgument(
getTableFunction() != null
|| getJsonTableRef() != null
|| getDynamicDestinations() != null,
"must set the table reference of a BigQueryIO.Write transform");
List<?> allToArgs =
Lists.newArrayList(getJsonTableRef(), getTableFunction(), getDynamicDestinations());
checkArgument(
1
== Iterables.size(
allToArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"Exactly one of jsonTableRef, tableFunction, or dynamicDestinations must be set");
List<?> allSchemaArgs =
Lists.newArrayList(getJsonSchema(), getSchemaFromView(), getDynamicDestinations());
checkArgument(
2
> Iterables.size(
allSchemaArgs.stream()
.filter(Predicates.notNull()::apply)
.collect(Collectors.toList())),
"No more than one of jsonSchema, schemaFromView, or dynamicDestinations may be set");
Write.Method method = resolveMethod(input);
if (input.isBounded() == IsBounded.UNBOUNDED
&& (method == Write.Method.FILE_LOADS || method == Write.Method.STORAGE_WRITE_API)) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Duration triggeringFrequency =
(method == Write.Method.STORAGE_WRITE_API)
? getStorageApiTriggeringFrequency(bqOptions)
: getTriggeringFrequency();
checkArgument(
triggeringFrequency != null,
"When writing an unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, "
+ "triggering frequency must be specified");
} else {
checkArgument(
getTriggeringFrequency() == null && getNumFileShards() == 0,
"Triggering frequency or number of file shards can be specified only when writing an"
+ " unbounded PCollection via FILE_LOADS or STORAGE_API_WRITES, but: the collection"
+ " was %s and the method was %s",
input.isBounded(),
method);
}
if (method != Write.Method.FILE_LOADS) {
// we only support writing avro for FILE_LOADS
checkArgument(
getAvroRowWriterFactory() == null,
"Writing avro formatted data is only supported for FILE_LOADS, however "
+ "the method was %s",
method);
}
if (input.isBounded() == IsBounded.BOUNDED) {
checkArgument(!getAutoSharding(), "Auto-sharding is only applicable to unbounded input.");
}
if (method == Write.Method.STORAGE_WRITE_API) {
checkArgument(!getAutoSharding(), "Auto sharding not yet available for Storage API writes");
}
if (getJsonTimePartitioning() != null) {
checkArgument(
getDynamicDestinations() == null,
"The supplied DynamicDestinations object can directly set TimePartitioning."
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
checkArgument(
getTableFunction() == null,
"The supplied getTableFunction object can directly set TimePartitioning."
+ " There is no need to call BigQueryIO.Write.withTimePartitioning.");
}
DynamicDestinations<T, ?> dynamicDestinations = getDynamicDestinations();
if (dynamicDestinations == null) {
if (getJsonTableRef() != null) {
dynamicDestinations =
DynamicDestinationsHelpers.ConstantTableDestinations.fromJsonTableRef(
getJsonTableRef(), getTableDescription(), getClustering() != null);
} else if (getTableFunction() != null) {
dynamicDestinations =
new TableFunctionDestinations<>(getTableFunction(), getClustering() != null);
}
// Wrap with a DynamicDestinations class that will provide a schema. There might be no
// schema provided if the create disposition is CREATE_NEVER.
if (getJsonSchema() != null) {
dynamicDestinations =
new ConstantSchemaDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations, getJsonSchema());
} else if (getSchemaFromView() != null) {
dynamicDestinations =
new SchemaFromViewDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getSchemaFromView());
}
// Wrap with a DynamicDestinations class that will provide the proper TimePartitioning.
if (getJsonTimePartitioning() != null) {
dynamicDestinations =
new ConstantTimePartitioningDestinations<>(
(DynamicDestinations<T, TableDestination>) dynamicDestinations,
getJsonTimePartitioning(),
StaticValueProvider.of(BigQueryHelpers.toJsonString(getClustering())));
}
}
return expandTyped(input, dynamicDestinations);
}
private <DestinationT> WriteResult expandTyped(
PCollection<T> input, DynamicDestinations<T, DestinationT> dynamicDestinations) {
boolean optimizeWrites = getOptimizeWrites();
SerializableFunction<T, TableRow> formatFunction = getFormatFunction();
SerializableFunction<T, TableRow> formatRecordOnFailureFunction =
getFormatRecordOnFailureFunction();
RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT> avroRowWriterFactory =
(RowWriterFactory.AvroRowWriterFactory<T, ?, DestinationT>) getAvroRowWriterFactory();
boolean hasSchema =
getJsonSchema() != null
|| getDynamicDestinations() != null
|| getSchemaFromView() != null;
if (getUseBeamSchema()) {
checkArgument(input.hasSchema(), "The input doesn't has a schema");
optimizeWrites = true;
checkArgument(
avroRowWriterFactory == null,
"avro avroFormatFunction is unsupported when using Beam schemas.");
if (formatFunction == null) {
// If no format function set, then we will automatically convert the input type to a
// TableRow.
// TODO: it would be trivial to convert to avro records here instead.
formatFunction = BigQueryUtils.toTableRow(input.getToRowFunction());
}
// Infer the TableSchema from the input Beam schema.
TableSchema tableSchema = BigQueryUtils.toTableSchema(input.getSchema());
dynamicDestinations =
new ConstantSchemaDestinations<>(
dynamicDestinations,
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableSchema)));
} else {
// Require a schema if creating one or more tables.
checkArgument(
getCreateDisposition() != CreateDisposition.CREATE_IF_NEEDED || hasSchema,
"CreateDisposition is CREATE_IF_NEEDED, however no schema was provided.");
}
Coder<DestinationT> destinationCoder;
try {
destinationCoder =
dynamicDestinations.getDestinationCoderWithDefault(
input.getPipeline().getCoderRegistry());
} catch (CannotProvideCoderException e) {
throw new RuntimeException(e);
}
Write.Method method = resolveMethod(input);
RowWriterFactory<T, DestinationT> rowWriterFactory;
if (optimizeWrites) {
if (avroRowWriterFactory != null) {
checkArgument(
formatFunction == null,
"Only one of withFormatFunction or withAvroFormatFunction/withAvroWriter maybe set,"
+ " not both.");
SerializableFunction<@Nullable TableSchema, org.apache.avro.Schema> avroSchemaFactory =
getAvroSchemaFactory();
if (avroSchemaFactory == null) {
checkArgument(
hasSchema,
"A schema must be provided if an avroFormatFunction "
+ "is set but no avroSchemaFactory is defined.");
avroSchemaFactory = DEFAULT_AVRO_SCHEMA_FACTORY;
}
rowWriterFactory = avroRowWriterFactory.prepare(dynamicDestinations, avroSchemaFactory);
} else if (formatFunction != null) {
rowWriterFactory =
RowWriterFactory.tableRows(formatFunction, formatRecordOnFailureFunction);
} else {
throw new IllegalArgumentException(
"A function must be provided to convert the input type into a TableRow or "
+ "GenericRecord. Use BigQueryIO.Write.withFormatFunction or "
+ "BigQueryIO.Write.withAvroFormatFunction to provide a formatting function. "
+ "A format function is not required if Beam schemas are used.");
}
} else {
checkArgument(
avroRowWriterFactory == null,
"When using a formatFunction, the AvroRowWriterFactory should be null");
checkArgument(
formatFunction != null,
"A function must be provided to convert the input type into a TableRow or "
+ "GenericRecord. Use BigQueryIO.Write.withFormatFunction or "
+ "BigQueryIO.Write.withAvroFormatFunction to provide a formatting function. "
+ "A format function is not required if Beam schemas are used.");
rowWriterFactory =
RowWriterFactory.tableRows(formatFunction, formatRecordOnFailureFunction);
}
PCollection<KV<DestinationT, T>> rowsWithDestination =
input
.apply(
"PrepareWrite",
new PrepareWrite<>(dynamicDestinations, SerializableFunctions.identity()))
.setCoder(KvCoder.of(destinationCoder, input.getCoder()));
return continueExpandTyped(
rowsWithDestination,
input.getCoder(),
getUseBeamSchema() ? input.getSchema() : null,
getUseBeamSchema() ? input.getToRowFunction() : null,
destinationCoder,
dynamicDestinations,
rowWriterFactory,
method);
}
private <DestinationT> WriteResult continueExpandTyped(
PCollection<KV<DestinationT, T>> input,
Coder<T> elementCoder,
@Nullable Schema elementSchema,
@Nullable SerializableFunction<T, Row> elementToRowFunction,
Coder<DestinationT> destinationCoder,
DynamicDestinations<T, DestinationT> dynamicDestinations,
RowWriterFactory<T, DestinationT> rowWriterFactory,
Write.Method method) {
if (method == Write.Method.STREAMING_INSERTS) {
checkArgument(
getWriteDisposition() != WriteDisposition.WRITE_TRUNCATE,
"WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection.");
InsertRetryPolicy retryPolicy =
MoreObjects.firstNonNull(getFailedInsertRetryPolicy(), InsertRetryPolicy.alwaysRetry());
checkArgument(
rowWriterFactory.getOutputType() == RowWriterFactory.OutputType.JsonTableRow,
"Avro output is not supported when method == STREAMING_INSERTS");
checkArgument(
getSchemaUpdateOptions() == null || getSchemaUpdateOptions().isEmpty(),
"SchemaUpdateOptions are not supported when method == STREAMING_INSERTS");
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
StreamingInserts<DestinationT, T> streamingInserts =
new StreamingInserts<>(
getCreateDisposition(),
dynamicDestinations,
elementCoder,
tableRowWriterFactory.getToRowFn(),
tableRowWriterFactory.getToFailsafeRowFn())
.withInsertRetryPolicy(retryPolicy)
.withTestServices(getBigQueryServices())
.withExtendedErrorInfo(getExtendedErrorInfo())
.withSkipInvalidRows(getSkipInvalidRows())
.withIgnoreUnknownValues(getIgnoreUnknownValues())
.withIgnoreInsertIds(getIgnoreInsertIds())
.withAutoSharding(getAutoSharding())
.withSuccessfulInsertsPropagation(getPropagateSuccessful())
.withDeterministicRecordIdFn(getDeterministicRecordIdFn())
.withKmsKey(getKmsKey());
return input.apply(streamingInserts);
} else if (method == Write.Method.FILE_LOADS) {
checkArgument(
getFailedInsertRetryPolicy() == null,
"Record-insert retry policies are not supported when using BigQuery load jobs.");
if (getUseAvroLogicalTypes()) {
checkArgument(
rowWriterFactory.getOutputType() == OutputType.AvroGenericRecord,
"useAvroLogicalTypes can only be set with Avro output.");
}
// Batch load jobs currently support JSON data insertion only with CSV files
if (getJsonSchema() != null && getJsonSchema().isAccessible()) {
JsonElement schema = JsonParser.parseString(getJsonSchema().get());
if (!schema.getAsJsonObject().keySet().isEmpty()) {
validateNoJsonTypeInSchema(schema);
}
}
BatchLoads<DestinationT, T> batchLoads =
new BatchLoads<>(
getWriteDisposition(),
getCreateDisposition(),
getJsonTableRef() != null,
dynamicDestinations,
destinationCoder,
getCustomGcsTempLocation(),
getLoadJobProjectId(),
getIgnoreUnknownValues(),
elementCoder,
rowWriterFactory,
getKmsKey(),
getClustering() != null,
getUseAvroLogicalTypes(),
getWriteTempDataset());
batchLoads.setTestServices(getBigQueryServices());
if (getSchemaUpdateOptions() != null) {
batchLoads.setSchemaUpdateOptions(getSchemaUpdateOptions());
}
if (getMaxFilesPerBundle() != null) {
batchLoads.setMaxNumWritersPerBundle(getMaxFilesPerBundle());
}
if (getMaxFileSize() != null) {
batchLoads.setMaxFileSize(getMaxFileSize());
}
batchLoads.setMaxFilesPerPartition(getMaxFilesPerPartition());
batchLoads.setMaxBytesPerPartition(getMaxBytesPerPartition());
// When running in streaming (unbounded mode) we want to retry failed load jobs
// indefinitely. Failing the bundle is expensive, so we set a fairly high limit on retries.
if (IsBounded.UNBOUNDED.equals(input.isBounded())) {
batchLoads.setMaxRetryJobs(1000);
}
batchLoads.setTriggeringFrequency(getTriggeringFrequency());
if (getAutoSharding()) {
batchLoads.setNumFileShards(0);
} else {
batchLoads.setNumFileShards(getNumFileShards());
}
return input.apply(batchLoads);
} else if (method == Method.STORAGE_WRITE_API || method == Method.STORAGE_API_AT_LEAST_ONCE) {
BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
StorageApiDynamicDestinations<T, DestinationT> storageApiDynamicDestinations;
if (getUseBeamSchema()) {
// This ensures that the Beam rows are directly translated into protos for Storage API
// writes, with no
// need to round trip through JSON TableRow objects.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsBeamRow<>(
dynamicDestinations, elementSchema, elementToRowFunction);
} else {
RowWriterFactory.TableRowWriterFactory<T, DestinationT> tableRowWriterFactory =
(RowWriterFactory.TableRowWriterFactory<T, DestinationT>) rowWriterFactory;
// Fallback behavior: convert to JSON TableRows and convert those into Beam TableRows.
storageApiDynamicDestinations =
new StorageApiDynamicDestinationsTableRow<>(
dynamicDestinations,
tableRowWriterFactory.getToRowFn(),
getCreateDisposition(),
getIgnoreUnknownValues(),
bqOptions.getSchemaUpdateRetries());
}
StorageApiLoads<DestinationT, T> storageApiLoads =
new StorageApiLoads<DestinationT, T>(
destinationCoder,
storageApiDynamicDestinations,
getCreateDisposition(),
getKmsKey(),
getStorageApiTriggeringFrequency(bqOptions),
getBigQueryServices(),
getStorageApiNumStreams(bqOptions),
method == Method.STORAGE_API_AT_LEAST_ONCE);
return input.apply("StorageApiLoads", storageApiLoads);
} else {
throw new RuntimeException("Unexpected write method " + method);
}
}
private void validateNoJsonTypeInSchema(JsonElement schema) {
JsonElement fields = schema.getAsJsonObject().get("fields");
if (!fields.isJsonArray() || fields.getAsJsonArray().isEmpty()) {
return;
}
JsonArray fieldArray = fields.getAsJsonArray();
for (int i = 0; i < fieldArray.size(); i++) {
JsonObject field = fieldArray.get(i).getAsJsonObject();
checkArgument(
!field.get("type").getAsString().equals("JSON"),
"Found JSON type in TableSchema. JSON data insertion is currently "
+ "not supported with 'FILE_LOADS' write method. This is supported with the "
+ "other write methods, however. For more information, visit: "
+ "https://cloud.google.com/bigquery/docs/reference/standard-sql/"
+ "json-data#ingest_json_data");
if (field.get("type").getAsString().equals("STRUCT")) {
validateNoJsonTypeInSchema(field);
}
}
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.addIfNotNull(
DisplayData.item("table", getJsonTableRef()).withLabel("Table Reference"));
if (getJsonSchema() != null) {
builder.addIfNotNull(DisplayData.item("schema", getJsonSchema()).withLabel("Table Schema"));
} else {
builder.add(DisplayData.item("schema", "Custom Schema Function").withLabel("Table Schema"));
}
if (getTableFunction() != null) {
builder.add(
DisplayData.item("tableFn", getTableFunction().getClass())
.withLabel("Table Reference Function"));
}
builder
.add(
DisplayData.item("createDisposition", getCreateDisposition().toString())
.withLabel("Table CreateDisposition"))
.add(
DisplayData.item("writeDisposition", getWriteDisposition().toString())
.withLabel("Table WriteDisposition"))
.add(
DisplayData.item("schemaUpdateOptions", getSchemaUpdateOptions().toString())
.withLabel("Table SchemaUpdateOptions"))
.addIfNotDefault(
DisplayData.item("validation", getValidate()).withLabel("Validation Enabled"), true)
.addIfNotNull(
DisplayData.item("tableDescription", getTableDescription())
.withLabel("Table Description"));
}
/**
* Returns the table to write, or {@code null} if writing with {@code tableFunction}.
*
* <p>If the table's project is not specified, use the executing project.
*/
@Nullable
ValueProvider<TableReference> getTableWithDefaultProject(BigQueryOptions bqOptions) {
ValueProvider<TableReference> table = getTable();
if (table == null) {
return table;
}
if (!table.isAccessible()) {
LOG.info(
"Using a dynamic value for table input. This must contain a project"
+ " in the table reference: {}",
table);
return table;
}
if (Strings.isNullOrEmpty(table.get().getProjectId())) {
// If user does not specify a project we assume the table to be located in
// the default project.
TableReference tableRef = table.get();
tableRef.setProjectId(
bqOptions.getBigQueryProject() == null
? bqOptions.getProject()
: bqOptions.getBigQueryProject());
return NestedValueProvider.of(
StaticValueProvider.of(BigQueryHelpers.toJsonString(tableRef)),
new JsonTableRefToTableRef());
}
return table;
}
/** Returns the table reference, or {@code null}. */
public @Nullable ValueProvider<TableReference> getTable() {
return getJsonTableRef() == null
? null
: NestedValueProvider.of(getJsonTableRef(), new JsonTableRefToTableRef());
}
}
/** Clear the cached map of created tables. Used for testing. */
@VisibleForTesting
static void clearStaticCaches() throws ExecutionException, InterruptedException {
CreateTables.clearCreatedTables();
TwoLevelMessageConverterCache.clear();
StorageApiDynamicDestinationsTableRow.clearSchemaCache();
StorageApiWriteUnshardedRecords.clearCache();
StorageApiWritesShardedRecords.clearCache();
}
/////////////////////////////////////////////////////////////////////////////
/** Disallow construction of utility class. */
private BigQueryIO() {}
}