blob: ca465dca282baf05d51dc4a3ec1d0d08145a1497 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.core.ApiFuture;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.ProtoRows;
import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type;
import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Message;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.io.IOException;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A transform to write sharded records to BigQuery using the Storage API. */
@SuppressWarnings("FutureReturnValueIgnored")
public class StorageApiWritesShardedRecords<DestinationT, ElementT>
extends PTransform<
PCollection<KV<ShardedKey<DestinationT>, Iterable<ElementT>>>, PCollection<Void>> {
private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
private final CreateDisposition createDisposition;
private final String kmsKey;
private final BigQueryServices bqServices;
private final Coder<DestinationT> destinationCoder;
@Nullable private DatasetService datasetServiceInternal = null;
private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
private static final Cache<String, StreamAppendClient> APPEND_CLIENTS =
CacheBuilder.newBuilder()
.expireAfterAccess(15, TimeUnit.MINUTES)
.removalListener(
(RemovalNotification<String, StreamAppendClient> removal) -> {
@Nullable final StreamAppendClient streamAppendClient = removal.getValue();
// Close the writer in a different thread so as not to block the main one.
runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close);
})
.build();
private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
if (datasetServiceInternal == null) {
datasetServiceInternal =
bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class));
}
return datasetServiceInternal;
}
// Run a closure asynchronously, ignoring failures.
private interface ThrowingRunnable {
void run() throws Exception;
}
private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) {
executor.submit(
() -> {
try {
task.run();
} catch (Exception e) {
//
}
});
}
public StorageApiWritesShardedRecords(
StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations,
CreateDisposition createDisposition,
String kmsKey,
BigQueryServices bqServices,
Coder<DestinationT> destinationCoder) {
this.dynamicDestinations = dynamicDestinations;
this.createDisposition = createDisposition;
this.kmsKey = kmsKey;
this.bqServices = bqServices;
this.destinationCoder = destinationCoder;
}
@Override
public PCollection<Void> expand(
PCollection<KV<ShardedKey<DestinationT>, Iterable<ElementT>>> input) {
String operationName = input.getName() + "/" + getName();
// Append records to the Storage API streams.
PCollection<KV<String, Operation>> written =
input.apply(
"Write Records",
ParDo.of(new WriteRecordsDoFn(operationName))
.withSideInputs(dynamicDestinations.getSideInputs()));
SchemaCoder<Operation> operationCoder;
try {
SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry();
operationCoder =
SchemaCoder.of(
schemaRegistry.getSchema(Operation.class),
TypeDescriptor.of(Operation.class),
schemaRegistry.getToRowFunction(Operation.class),
schemaRegistry.getFromRowFunction(Operation.class));
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
// Send all successful writes to be flushed.
return written
.setCoder(KvCoder.of(StringUtf8Coder.of(), operationCoder))
.apply(
Window.<KV<String, Operation>>configure()
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1))))
.discardingFiredPanes())
.apply("maxFlushPosition", Combine.perKey(Max.naturalOrder(new Operation(-1, false))))
.apply(
"Flush and finalize writes", ParDo.of(new StorageApiFlushAndFinalizeDoFn(bqServices)));
}
/**
* Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize
* parameter controls how many rows are batched into a single ProtoRows object before we move on
* to the next one.
*/
static class SplittingIterable<T extends Message> implements Iterable<ProtoRows> {
private final Iterable<T> underlying;
private final long splitSize;
public SplittingIterable(Iterable<T> underlying, long splitSize) {
this.underlying = underlying;
this.splitSize = splitSize;
}
@Override
public Iterator<ProtoRows> iterator() {
return new Iterator<ProtoRows>() {
final Iterator<T> underlyingIterator = underlying.iterator();
@Override
public boolean hasNext() {
return underlyingIterator.hasNext();
}
@Override
public ProtoRows next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
ProtoRows.Builder inserts = ProtoRows.newBuilder();
long bytesSize = 0;
while (underlyingIterator.hasNext()) {
ByteString byteString = underlyingIterator.next().toByteString();
inserts.addSerializedRows(byteString);
bytesSize += byteString.size();
if (bytesSize > splitSize) {
break;
}
}
return inserts.build();
}
};
}
}
class WriteRecordsDoFn
extends DoFn<KV<ShardedKey<DestinationT>, Iterable<ElementT>>, KV<String, Operation>> {
private final Counter recordsAppended =
Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
private final Counter streamsCreated =
Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
private final Counter appendFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
private final Counter appendOffsetFailures =
Metrics.counter(WriteRecordsDoFn.class, "appendOffsetFailures");
private final Counter flushesScheduled =
Metrics.counter(WriteRecordsDoFn.class, "flushesScheduled");
private final Distribution appendLatencyDistribution =
Metrics.distribution(WriteRecordsDoFn.class, "appendLatencyDistributionMs");
private final Distribution appendSizeDistribution =
Metrics.distribution(WriteRecordsDoFn.class, "appendSizeDistribution");
private final Distribution appendSplitDistribution =
Metrics.distribution(WriteRecordsDoFn.class, "appendSplitDistribution");
private Map<DestinationT, TableDestination> destinations = Maps.newHashMap();
private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
// Stores the current stream for this key.
@StateId("streamName")
private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value();
// Stores the current stream offset.
@StateId("streamOffset")
private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();
public WriteRecordsDoFn(String operationName) {
this.messageConverters = new TwoLevelMessageConverterCache<>(operationName);
}
@StartBundle
public void startBundle() throws IOException {
destinations = Maps.newHashMap();
}
// Get the current stream for this key. If there is no current stream, create one and store the
// stream name in
// persistent state.
@SuppressWarnings({"nullness"})
String getOrCreateStream(
String tableId,
ValueState<String> streamName,
ValueState<Long> streamOffset,
DatasetService datasetService)
throws IOException, InterruptedException {
String stream = streamName.read();
if (Strings.isNullOrEmpty(stream)) {
// In a buffered stream, data is only visible up to the offset to which it was flushed.
stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName();
streamName.write(stream);
streamOffset.write(0L);
streamsCreated.inc();
}
return stream;
}
@SuppressWarnings({"nullness"})
@ProcessElement
public void process(
ProcessContext c,
final PipelineOptions pipelineOptions,
@Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element,
final @AlwaysFetched @StateId("streamName") ValueState<String> streamName,
final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
final OutputReceiver<KV<String, Operation>> o)
throws Exception {
dynamicDestinations.setSideInputAccessorFromProcessContext(c);
TableDestination tableDestination =
destinations.computeIfAbsent(
element.getKey().getKey(),
dest -> {
TableDestination tableDestination1 = dynamicDestinations.getTable(dest);
checkArgument(
tableDestination1 != null,
"DynamicDestinations.getTable() may not return null, "
+ "but %s returned null for destination %s",
dynamicDestinations,
dest);
Supplier<TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest);
return CreateTableHelpers.possiblyCreateTable(
c,
tableDestination1,
schemaSupplier,
createDisposition,
destinationCoder,
kmsKey,
bqServices);
});
final String tableId = tableDestination.getTableUrn();
final DatasetService datasetService = getDatasetService(pipelineOptions);
MessageConverter<ElementT> messageConverter =
messageConverters.get(element.getKey().getKey(), dynamicDestinations);
Descriptor descriptor = messageConverter.getSchemaDescriptor();
// Each ProtoRows object contains at most 1MB of rows.
// TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if
// already proto or
// already schema.
final long oneMb = 1024 * 1024;
Iterable<ProtoRows> messages =
new SplittingIterable<>(
Iterables.transform(element.getValue(), e -> messageConverter.toMessage(e)), oneMb);
class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
final ShardedKey<DestinationT> key;
String streamName = "";
StreamAppendClient client = null;
long offset = -1;
long numRows = 0;
long tryIteration = 0;
AppendRowsContext(ShardedKey<DestinationT> key) {
this.key = key;
}
@Override
public String toString() {
return "Context: key="
+ key
+ " streamName="
+ streamName
+ " offset="
+ offset
+ " numRows="
+ numRows
+ " tryIteration: "
+ tryIteration;
}
};
// Initialize stream names and offsets for all contexts. This will be called initially, but
// will also be called
// if we roll over to a new stream on a retry.
BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts =
(contexts, isFailure) -> {
try {
if (isFailure) {
// Clear the stream name, forcing a new one to be created.
streamName.write("");
}
String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService);
StreamAppendClient appendClient =
APPEND_CLIENTS.get(stream, () -> datasetService.getStreamAppendClient(stream));
for (AppendRowsContext context : contexts) {
context.streamName = stream;
appendClient.pin();
context.client = appendClient;
context.offset = streamOffset.read();
++context.tryIteration;
streamOffset.write(context.offset + context.numRows);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
};
Consumer<Iterable<AppendRowsContext>> clearClients =
contexts -> {
APPEND_CLIENTS.invalidate(streamName.read());
for (AppendRowsContext context : contexts) {
if (context.client != null) {
// Unpin in a different thread, as it may execute a blocking close.
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
context.client = null;
}
}
};
Instant now = Instant.now();
List<AppendRowsContext> contexts = Lists.newArrayList();
RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 1000);
int numSplits = 0;
for (ProtoRows protoRows : messages) {
++numSplits;
Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run =
context -> {
try {
StreamAppendClient appendClient =
APPEND_CLIENTS.get(
context.streamName,
() -> datasetService.getStreamAppendClient(context.streamName));
return appendClient.appendRows(context.offset, protoRows, descriptor);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
// RetryManager
Function<Iterable<AppendRowsContext>, RetryType> onError =
failedContexts -> {
// The first context is always the one that fails.
AppendRowsContext failedContext =
Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null));
Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode();
// Invalidate the StreamWriter and force a new one to be created.
LOG.error(
"Got error " + failedContext.getError() + " closing " + failedContext.streamName);
clearClients.accept(contexts);
appendFailures.inc();
if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) {
appendOffsetFailures.inc();
LOG.warn(
"Append to "
+ failedContext
+ " failed with "
+ failedContext.getError()
+ " Will retry with a new stream");
// This means that the offset we have stored does not match the current end of
// the stream in the Storage API. Usually this happens because a crash or a bundle
// failure
// happened after an append but before the worker could checkpoint it's
// state. The records that were appended in a failed bundle will be retried,
// meaning that the unflushed tail of the stream must be discarded to prevent
// duplicates.
// Finalize the stream and clear streamName so a new stream will be created.
o.output(
KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true)));
// Reinitialize all contexts with the new stream and new offsets.
initializeContexts.accept(failedContexts, true);
// Offset failures imply that all subsequent parallel appends will also fail.
// Retry them all.
return RetryType.RETRY_ALL_OPERATIONS;
}
return RetryType.RETRY_ALL_OPERATIONS;
};
Consumer<AppendRowsContext> onSuccess =
context -> {
o.output(
KV.of(
context.streamName,
new Operation(context.offset + context.numRows - 1, false)));
flushesScheduled.inc(protoRows.getSerializedRowsCount());
};
AppendRowsContext context = new AppendRowsContext(element.getKey());
context.numRows = protoRows.getSerializedRowsCount();
contexts.add(context);
retryManager.addOperation(run, onError, onSuccess, context);
recordsAppended.inc(protoRows.getSerializedRowsCount());
appendSizeDistribution.update(context.numRows);
}
initializeContexts.accept(contexts, false);
try {
retryManager.run(true);
} finally {
// Make sure that all pins are removed.
for (AppendRowsContext context : contexts) {
if (context.client != null) {
runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin);
}
}
}
appendSplitDistribution.update(numSplits);
java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now());
appendLatencyDistribution.update(timeElapsed.toMillis());
}
@OnWindowExpiration
public void onWindowExpiration(
@AlwaysFetched @StateId("streamName") ValueState<String> streamName,
@AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset,
OutputReceiver<KV<String, Operation>> o) {
// Window is done - usually because the pipeline has been drained. Make sure to clean up
// streams so that they are not leaked.
String stream = MoreObjects.firstNonNull(streamName.read(), null);
if (!Strings.isNullOrEmpty(stream)) {
// Finalize the stream
long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L);
o.output(KV.of(stream, new Operation(nextOffset - 1, true)));
// Make sure that the stream object is closed.
APPEND_CLIENTS.invalidate(stream);
}
}
}
}