blob: 23cda2f57891bbcb787bb88b55c5ea21ffd7f6bb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.coders.AtomicCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
/**
* This transform takes in key-value pairs of {@link TableRow} entries and the {@link
* TableDestination} it should be written to. The BigQuery streaming-write service is used to stream
* these writes to the appropriate table.
*
* <p>This transform assumes that all destination tables already exist by the time it sees a write
* for that table.
*/
public class StreamingWriteTables<ElementT>
extends PTransform<PCollection<KV<TableDestination, ElementT>>, WriteResult> {
private BigQueryServices bigQueryServices;
private InsertRetryPolicy retryPolicy;
private boolean extendedErrorInfo;
private static final String FAILED_INSERTS_TAG_ID = "failedInserts";
private final boolean skipInvalidRows;
private final boolean ignoreUnknownValues;
private final boolean ignoreInsertIds;
private final boolean autoSharding;
private final boolean propagateSuccessful;
private final @Nullable Coder<ElementT> elementCoder;
private final @Nullable SerializableFunction<ElementT, TableRow> toTableRow;
private final @Nullable SerializableFunction<ElementT, TableRow> toFailsafeTableRow;
private final @Nullable SerializableFunction<ElementT, String> deterministicRecordIdFn;
public StreamingWriteTables() {
this(
new BigQueryServicesImpl(),
InsertRetryPolicy.alwaysRetry(),
false, // extendedErrorInfo
false, // skipInvalidRows
false, // ignoreUnknownValues
false, // ignoreInsertIds
false, // autoSharding
false, // propagateSuccessful
null, // elementCoder
null, // toTableRow
null, // toFailsafeTableRow
null); // deterministicRecordIdFn
}
private StreamingWriteTables(
BigQueryServices bigQueryServices,
InsertRetryPolicy retryPolicy,
boolean extendedErrorInfo,
boolean skipInvalidRows,
boolean ignoreUnknownValues,
boolean ignoreInsertIds,
boolean autoSharding,
boolean propagateSuccessful,
@Nullable Coder<ElementT> elementCoder,
@Nullable SerializableFunction<ElementT, TableRow> toTableRow,
@Nullable SerializableFunction<ElementT, TableRow> toFailsafeTableRow,
@Nullable SerializableFunction<ElementT, String> deterministicRecordIdFn) {
this.bigQueryServices = bigQueryServices;
this.retryPolicy = retryPolicy;
this.extendedErrorInfo = extendedErrorInfo;
this.skipInvalidRows = skipInvalidRows;
this.ignoreUnknownValues = ignoreUnknownValues;
this.ignoreInsertIds = ignoreInsertIds;
this.autoSharding = autoSharding;
this.propagateSuccessful = propagateSuccessful;
this.elementCoder = elementCoder;
this.toTableRow = toTableRow;
this.toFailsafeTableRow = toFailsafeTableRow;
this.deterministicRecordIdFn = deterministicRecordIdFn;
}
StreamingWriteTables<ElementT> withTestServices(BigQueryServices bigQueryServices) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withInsertRetryPolicy(InsertRetryPolicy retryPolicy) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withExtendedErrorInfo(boolean extendedErrorInfo) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withSkipInvalidRows(boolean skipInvalidRows) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withIgnoreUnknownValues(boolean ignoreUnknownValues) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withIgnoreInsertIds(boolean ignoreInsertIds) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withAutoSharding(boolean autoSharding) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withPropagateSuccessful(boolean propagateSuccessful) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withElementCoder(Coder<ElementT> elementCoder) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withToTableRow(
SerializableFunction<ElementT, TableRow> toTableRow) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withToFailsafeTableRow(
SerializableFunction<ElementT, TableRow> toFailsafeTableRow) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
StreamingWriteTables<ElementT> withDeterministicRecordIdFn(
@Nullable SerializableFunction<ElementT, String> deterministicRecordIdFn) {
return new StreamingWriteTables<>(
bigQueryServices,
retryPolicy,
extendedErrorInfo,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
autoSharding,
propagateSuccessful,
elementCoder,
toTableRow,
toFailsafeTableRow,
deterministicRecordIdFn);
}
@Override
public WriteResult expand(PCollection<KV<TableDestination, ElementT>> input) {
Preconditions.checkStateNotNull(elementCoder);
if (extendedErrorInfo) {
TupleTag<BigQueryInsertError> failedInsertsTag = new TupleTag<>(FAILED_INSERTS_TAG_ID);
PCollectionTuple result =
writeAndGetErrors(
input,
failedInsertsTag,
BigQueryInsertErrorCoder.of(),
ErrorContainer.BIG_QUERY_INSERT_ERROR_ERROR_CONTAINER);
PCollection<BigQueryInsertError> failedInserts = result.get(failedInsertsTag);
return WriteResult.withExtendedErrors(
input.getPipeline(),
failedInsertsTag,
failedInserts,
propagateSuccessful ? result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG) : null);
} else {
TupleTag<TableRow> failedInsertsTag = new TupleTag<>(FAILED_INSERTS_TAG_ID);
PCollectionTuple result =
writeAndGetErrors(
input,
failedInsertsTag,
TableRowJsonCoder.of(),
ErrorContainer.TABLE_ROW_ERROR_CONTAINER);
PCollection<TableRow> failedInserts = result.get(failedInsertsTag);
return WriteResult.in(
input.getPipeline(),
failedInsertsTag,
failedInserts,
propagateSuccessful ? result.get(BatchedStreamingWrite.SUCCESSFUL_ROWS_TAG) : null,
null,
null,
null,
null);
}
}
@RequiresNonNull({"elementCoder"})
private <T> PCollectionTuple writeAndGetErrors(
PCollection<KV<TableDestination, ElementT>> input,
TupleTag<T> failedInsertsTag,
AtomicCoder<T> coder,
ErrorContainer<T> errorContainer) {
BigQueryOptions options = input.getPipeline().getOptions().as(BigQueryOptions.class);
// A naive implementation would be to simply stream data directly to BigQuery.
// However, this could occasionally lead to duplicated data, e.g., when
// a VM that runs this code is restarted and the code is re-run.
// The above risk is mitigated in this implementation by relying on
// BigQuery built-in best effort de-dup mechanism.
// To use this mechanism, each input TableRow is tagged with a generated
// unique id, which is then passed to BigQuery and used to ignore duplicates.
// To prevent having the same TableRow processed more than once with regenerated
// different unique ids, this implementation relies on "checkpointing", which is
// achieved as a side effect of having BigQuery insertion immediately follow a GBK.
if (autoSharding && deterministicRecordIdFn == null) {
// If runner determined dynamic sharding is enabled, group TableRows on table destinations
// that may be sharded during the runtime. Otherwise, we choose a fixed number of shards per
// table destination following the logic below in the other branch.
PCollection<KV<String, TableRowInfo<ElementT>>> unshardedTagged =
input
.apply(
"MapToTableSpec",
MapElements.via(
new SimpleFunction<KV<TableDestination, ElementT>, KV<String, ElementT>>() {
@Override
public KV<String, ElementT> apply(KV<TableDestination, ElementT> input) {
return KV.of(input.getKey().getTableSpec(), input.getValue());
}
}))
.setCoder(KvCoder.of(StringUtf8Coder.of(), elementCoder))
.apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>()))
.setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(elementCoder)));
// Auto-sharding is achieved via GroupIntoBatches.WithShardedKey transform which groups and at
// the same time batches the TableRows to be inserted to BigQuery.
return unshardedTagged.apply(
"StreamingWrite",
new BatchedStreamingWrite<>(
bigQueryServices,
retryPolicy,
failedInsertsTag,
coder,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
propagateSuccessful,
toTableRow,
toFailsafeTableRow)
.viaStateful());
} else {
// We create 50 keys per BigQuery table to generate output on. This is few enough that we
// get good batching into BigQuery's insert calls, and enough that we can max out the
// streaming insert quota.
int numShards = options.getNumStreamingKeys();
PCollection<KV<ShardedKey<String>, TableRowInfo<ElementT>>> shardedTagged =
input
.apply("ShardTableWrites", ParDo.of(new GenerateShardedTable<>(numShards)))
.setCoder(KvCoder.of(ShardedKeyCoder.of(StringUtf8Coder.of()), elementCoder))
.apply("TagWithUniqueIds", ParDo.of(new TagWithUniqueIds<>(deterministicRecordIdFn)))
.setCoder(
KvCoder.of(
ShardedKeyCoder.of(StringUtf8Coder.of()),
TableRowInfoCoder.of(elementCoder)));
if (deterministicRecordIdFn == null) {
// If not using a deterministic function for record ids, we must apply a reshuffle to ensure
// determinism on the generated ids.
shardedTagged = shardedTagged.apply(Reshuffle.of());
}
return shardedTagged
// Put in the global window to ensure that DynamicDestinations side inputs are
// accessed
// correctly.
.apply(
"GlobalWindow",
Window.<KV<ShardedKey<String>, TableRowInfo<ElementT>>>into(new GlobalWindows())
.triggering(DefaultTrigger.of())
.discardingFiredPanes())
.apply(
"StripShardId",
MapElements.via(
new SimpleFunction<
KV<ShardedKey<String>, TableRowInfo<ElementT>>,
KV<String, TableRowInfo<ElementT>>>() {
@Override
public KV<String, TableRowInfo<ElementT>> apply(
KV<ShardedKey<String>, TableRowInfo<ElementT>> input) {
return KV.of(input.getKey().getKey(), input.getValue());
}
}))
.setCoder(KvCoder.of(StringUtf8Coder.of(), TableRowInfoCoder.of(elementCoder)))
// Also batch the TableRows in a best effort manner via bundle finalization before
// inserting to BigQuery.
.apply(
"StreamingWrite",
new BatchedStreamingWrite<>(
bigQueryServices,
retryPolicy,
failedInsertsTag,
coder,
errorContainer,
skipInvalidRows,
ignoreUnknownValues,
ignoreInsertIds,
propagateSuccessful,
toTableRow,
toFailsafeTableRow)
.viaDoFnFinalization());
}
}
}