| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigquery; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument; |
| |
| import com.google.api.core.ApiFuture; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.cloud.bigquery.storage.v1beta2.AppendRowsResponse; |
| import com.google.cloud.bigquery.storage.v1beta2.ProtoRows; |
| import com.google.cloud.bigquery.storage.v1beta2.WriteStream.Type; |
| import com.google.protobuf.ByteString; |
| import com.google.protobuf.Descriptors.Descriptor; |
| import com.google.protobuf.Message; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import java.io.IOException; |
| import java.time.Instant; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.coders.StringUtf8Coder; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StreamAppendClient; |
| import org.apache.beam.sdk.io.gcp.bigquery.RetryManager.RetryType; |
| import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations.MessageConverter; |
| import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn.Operation; |
| import org.apache.beam.sdk.metrics.Counter; |
| import org.apache.beam.sdk.metrics.Distribution; |
| import org.apache.beam.sdk.metrics.Metrics; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.schemas.NoSuchSchemaException; |
| import org.apache.beam.sdk.schemas.SchemaCoder; |
| import org.apache.beam.sdk.schemas.SchemaRegistry; |
| import org.apache.beam.sdk.state.StateSpec; |
| import org.apache.beam.sdk.state.StateSpecs; |
| import org.apache.beam.sdk.state.ValueState; |
| import org.apache.beam.sdk.transforms.Combine; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.Max; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime; |
| import org.apache.beam.sdk.transforms.windowing.Repeatedly; |
| import org.apache.beam.sdk.transforms.windowing.Window; |
| import org.apache.beam.sdk.util.ShardedKey; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.RemovalNotification; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps; |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** A transform to write sharded records to BigQuery using the Storage API. */ |
| @SuppressWarnings("FutureReturnValueIgnored") |
| public class StorageApiWritesShardedRecords<DestinationT, ElementT> |
| extends PTransform< |
| PCollection<KV<ShardedKey<DestinationT>, Iterable<ElementT>>>, PCollection<Void>> { |
| private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class); |
| |
| private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations; |
| private final CreateDisposition createDisposition; |
| private final String kmsKey; |
| private final BigQueryServices bqServices; |
| private final Coder<DestinationT> destinationCoder; |
| @Nullable private DatasetService datasetServiceInternal = null; |
| private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool(); |
| |
| private static final Cache<String, StreamAppendClient> APPEND_CLIENTS = |
| CacheBuilder.newBuilder() |
| .expireAfterAccess(15, TimeUnit.MINUTES) |
| .removalListener( |
| (RemovalNotification<String, StreamAppendClient> removal) -> { |
| @Nullable final StreamAppendClient streamAppendClient = removal.getValue(); |
| // Close the writer in a different thread so as not to block the main one. |
| runAsyncIgnoreFailure(closeWriterExecutor, streamAppendClient::close); |
| }) |
| .build(); |
| |
| private DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException { |
| if (datasetServiceInternal == null) { |
| datasetServiceInternal = |
| bqServices.getDatasetService(pipelineOptions.as(BigQueryOptions.class)); |
| } |
| return datasetServiceInternal; |
| } |
| |
| // Run a closure asynchronously, ignoring failures. |
| private interface ThrowingRunnable { |
| void run() throws Exception; |
| } |
| |
| private static void runAsyncIgnoreFailure(ExecutorService executor, ThrowingRunnable task) { |
| executor.submit( |
| () -> { |
| try { |
| task.run(); |
| } catch (Exception e) { |
| // |
| } |
| }); |
| } |
| |
| public StorageApiWritesShardedRecords( |
| StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations, |
| CreateDisposition createDisposition, |
| String kmsKey, |
| BigQueryServices bqServices, |
| Coder<DestinationT> destinationCoder) { |
| this.dynamicDestinations = dynamicDestinations; |
| this.createDisposition = createDisposition; |
| this.kmsKey = kmsKey; |
| this.bqServices = bqServices; |
| this.destinationCoder = destinationCoder; |
| } |
| |
| @Override |
| public PCollection<Void> expand( |
| PCollection<KV<ShardedKey<DestinationT>, Iterable<ElementT>>> input) { |
| String operationName = input.getName() + "/" + getName(); |
| // Append records to the Storage API streams. |
| PCollection<KV<String, Operation>> written = |
| input.apply( |
| "Write Records", |
| ParDo.of(new WriteRecordsDoFn(operationName)) |
| .withSideInputs(dynamicDestinations.getSideInputs())); |
| |
| SchemaCoder<Operation> operationCoder; |
| try { |
| SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry(); |
| operationCoder = |
| SchemaCoder.of( |
| schemaRegistry.getSchema(Operation.class), |
| TypeDescriptor.of(Operation.class), |
| schemaRegistry.getToRowFunction(Operation.class), |
| schemaRegistry.getFromRowFunction(Operation.class)); |
| } catch (NoSuchSchemaException e) { |
| throw new RuntimeException(e); |
| } |
| |
| // Send all successful writes to be flushed. |
| return written |
| .setCoder(KvCoder.of(StringUtf8Coder.of(), operationCoder)) |
| .apply( |
| Window.<KV<String, Operation>>configure() |
| .triggering( |
| Repeatedly.forever( |
| AfterProcessingTime.pastFirstElementInPane() |
| .plusDelayOf(Duration.standardSeconds(1)))) |
| .discardingFiredPanes()) |
| .apply("maxFlushPosition", Combine.perKey(Max.naturalOrder(new Operation(-1, false)))) |
| .apply( |
| "Flush and finalize writes", ParDo.of(new StorageApiFlushAndFinalizeDoFn(bqServices))); |
| } |
| |
| /** |
| * Takes in an iterable and batches the results into multiple ProtoRows objects. The splitSize |
| * parameter controls how many rows are batched into a single ProtoRows object before we move on |
| * to the next one. |
| */ |
| static class SplittingIterable<T extends Message> implements Iterable<ProtoRows> { |
| private final Iterable<T> underlying; |
| private final long splitSize; |
| |
| public SplittingIterable(Iterable<T> underlying, long splitSize) { |
| this.underlying = underlying; |
| this.splitSize = splitSize; |
| } |
| |
| @Override |
| public Iterator<ProtoRows> iterator() { |
| return new Iterator<ProtoRows>() { |
| final Iterator<T> underlyingIterator = underlying.iterator(); |
| |
| @Override |
| public boolean hasNext() { |
| return underlyingIterator.hasNext(); |
| } |
| |
| @Override |
| public ProtoRows next() { |
| if (!hasNext()) { |
| throw new NoSuchElementException(); |
| } |
| |
| ProtoRows.Builder inserts = ProtoRows.newBuilder(); |
| long bytesSize = 0; |
| while (underlyingIterator.hasNext()) { |
| ByteString byteString = underlyingIterator.next().toByteString(); |
| inserts.addSerializedRows(byteString); |
| bytesSize += byteString.size(); |
| if (bytesSize > splitSize) { |
| break; |
| } |
| } |
| return inserts.build(); |
| } |
| }; |
| } |
| } |
| |
| class WriteRecordsDoFn |
| extends DoFn<KV<ShardedKey<DestinationT>, Iterable<ElementT>>, KV<String, Operation>> { |
| private final Counter recordsAppended = |
| Metrics.counter(WriteRecordsDoFn.class, "recordsAppended"); |
| private final Counter streamsCreated = |
| Metrics.counter(WriteRecordsDoFn.class, "streamsCreated"); |
| private final Counter appendFailures = |
| Metrics.counter(WriteRecordsDoFn.class, "appendFailures"); |
| private final Counter appendOffsetFailures = |
| Metrics.counter(WriteRecordsDoFn.class, "appendOffsetFailures"); |
| private final Counter flushesScheduled = |
| Metrics.counter(WriteRecordsDoFn.class, "flushesScheduled"); |
| private final Distribution appendLatencyDistribution = |
| Metrics.distribution(WriteRecordsDoFn.class, "appendLatencyDistributionMs"); |
| private final Distribution appendSizeDistribution = |
| Metrics.distribution(WriteRecordsDoFn.class, "appendSizeDistribution"); |
| private final Distribution appendSplitDistribution = |
| Metrics.distribution(WriteRecordsDoFn.class, "appendSplitDistribution"); |
| |
| private Map<DestinationT, TableDestination> destinations = Maps.newHashMap(); |
| |
| private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters; |
| |
| // Stores the current stream for this key. |
| @StateId("streamName") |
| private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value(); |
| |
| // Stores the current stream offset. |
| @StateId("streamOffset") |
| private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value(); |
| |
| public WriteRecordsDoFn(String operationName) { |
| this.messageConverters = new TwoLevelMessageConverterCache<>(operationName); |
| } |
| |
| @StartBundle |
| public void startBundle() throws IOException { |
| destinations = Maps.newHashMap(); |
| } |
| |
| // Get the current stream for this key. If there is no current stream, create one and store the |
| // stream name in |
| // persistent state. |
| @SuppressWarnings({"nullness"}) |
| String getOrCreateStream( |
| String tableId, |
| ValueState<String> streamName, |
| ValueState<Long> streamOffset, |
| DatasetService datasetService) |
| throws IOException, InterruptedException { |
| String stream = streamName.read(); |
| if (Strings.isNullOrEmpty(stream)) { |
| // In a buffered stream, data is only visible up to the offset to which it was flushed. |
| stream = datasetService.createWriteStream(tableId, Type.BUFFERED).getName(); |
| streamName.write(stream); |
| streamOffset.write(0L); |
| streamsCreated.inc(); |
| } |
| return stream; |
| } |
| |
| @SuppressWarnings({"nullness"}) |
| @ProcessElement |
| public void process( |
| ProcessContext c, |
| final PipelineOptions pipelineOptions, |
| @Element KV<ShardedKey<DestinationT>, Iterable<ElementT>> element, |
| final @AlwaysFetched @StateId("streamName") ValueState<String> streamName, |
| final @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset, |
| final OutputReceiver<KV<String, Operation>> o) |
| throws Exception { |
| dynamicDestinations.setSideInputAccessorFromProcessContext(c); |
| TableDestination tableDestination = |
| destinations.computeIfAbsent( |
| element.getKey().getKey(), |
| dest -> { |
| TableDestination tableDestination1 = dynamicDestinations.getTable(dest); |
| checkArgument( |
| tableDestination1 != null, |
| "DynamicDestinations.getTable() may not return null, " |
| + "but %s returned null for destination %s", |
| dynamicDestinations, |
| dest); |
| Supplier<TableSchema> schemaSupplier = () -> dynamicDestinations.getSchema(dest); |
| return CreateTableHelpers.possiblyCreateTable( |
| c, |
| tableDestination1, |
| schemaSupplier, |
| createDisposition, |
| destinationCoder, |
| kmsKey, |
| bqServices); |
| }); |
| final String tableId = tableDestination.getTableUrn(); |
| final DatasetService datasetService = getDatasetService(pipelineOptions); |
| MessageConverter<ElementT> messageConverter = |
| messageConverters.get(element.getKey().getKey(), dynamicDestinations); |
| Descriptor descriptor = messageConverter.getSchemaDescriptor(); |
| // Each ProtoRows object contains at most 1MB of rows. |
| // TODO: Push messageFromTableRow up to top level. That we we cans skip TableRow entirely if |
| // already proto or |
| // already schema. |
| final long oneMb = 1024 * 1024; |
| Iterable<ProtoRows> messages = |
| new SplittingIterable<>( |
| Iterables.transform(element.getValue(), e -> messageConverter.toMessage(e)), oneMb); |
| |
| class AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> { |
| final ShardedKey<DestinationT> key; |
| String streamName = ""; |
| StreamAppendClient client = null; |
| long offset = -1; |
| long numRows = 0; |
| long tryIteration = 0; |
| |
| AppendRowsContext(ShardedKey<DestinationT> key) { |
| this.key = key; |
| } |
| |
| @Override |
| public String toString() { |
| return "Context: key=" |
| + key |
| + " streamName=" |
| + streamName |
| + " offset=" |
| + offset |
| + " numRows=" |
| + numRows |
| + " tryIteration: " |
| + tryIteration; |
| } |
| }; |
| |
| // Initialize stream names and offsets for all contexts. This will be called initially, but |
| // will also be called |
| // if we roll over to a new stream on a retry. |
| BiConsumer<Iterable<AppendRowsContext>, Boolean> initializeContexts = |
| (contexts, isFailure) -> { |
| try { |
| if (isFailure) { |
| // Clear the stream name, forcing a new one to be created. |
| streamName.write(""); |
| } |
| String stream = getOrCreateStream(tableId, streamName, streamOffset, datasetService); |
| StreamAppendClient appendClient = |
| APPEND_CLIENTS.get(stream, () -> datasetService.getStreamAppendClient(stream)); |
| for (AppendRowsContext context : contexts) { |
| context.streamName = stream; |
| appendClient.pin(); |
| context.client = appendClient; |
| context.offset = streamOffset.read(); |
| ++context.tryIteration; |
| streamOffset.write(context.offset + context.numRows); |
| } |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| Consumer<Iterable<AppendRowsContext>> clearClients = |
| contexts -> { |
| APPEND_CLIENTS.invalidate(streamName.read()); |
| for (AppendRowsContext context : contexts) { |
| if (context.client != null) { |
| // Unpin in a different thread, as it may execute a blocking close. |
| runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); |
| context.client = null; |
| } |
| } |
| }; |
| |
| Instant now = Instant.now(); |
| List<AppendRowsContext> contexts = Lists.newArrayList(); |
| RetryManager<AppendRowsResponse, AppendRowsContext> retryManager = |
| new RetryManager<>(Duration.standardSeconds(1), Duration.standardMinutes(1), 1000); |
| int numSplits = 0; |
| for (ProtoRows protoRows : messages) { |
| ++numSplits; |
| Function<AppendRowsContext, ApiFuture<AppendRowsResponse>> run = |
| context -> { |
| try { |
| StreamAppendClient appendClient = |
| APPEND_CLIENTS.get( |
| context.streamName, |
| () -> datasetService.getStreamAppendClient(context.streamName)); |
| return appendClient.appendRows(context.offset, protoRows, descriptor); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }; |
| |
| // RetryManager |
| Function<Iterable<AppendRowsContext>, RetryType> onError = |
| failedContexts -> { |
| // The first context is always the one that fails. |
| AppendRowsContext failedContext = |
| Preconditions.checkNotNull(Iterables.getFirst(failedContexts, null)); |
| Status.Code statusCode = Status.fromThrowable(failedContext.getError()).getCode(); |
| // Invalidate the StreamWriter and force a new one to be created. |
| LOG.error( |
| "Got error " + failedContext.getError() + " closing " + failedContext.streamName); |
| clearClients.accept(contexts); |
| appendFailures.inc(); |
| if (statusCode.equals(Code.OUT_OF_RANGE) || statusCode.equals(Code.ALREADY_EXISTS)) { |
| appendOffsetFailures.inc(); |
| LOG.warn( |
| "Append to " |
| + failedContext |
| + " failed with " |
| + failedContext.getError() |
| + " Will retry with a new stream"); |
| // This means that the offset we have stored does not match the current end of |
| // the stream in the Storage API. Usually this happens because a crash or a bundle |
| // failure |
| // happened after an append but before the worker could checkpoint it's |
| // state. The records that were appended in a failed bundle will be retried, |
| // meaning that the unflushed tail of the stream must be discarded to prevent |
| // duplicates. |
| |
| // Finalize the stream and clear streamName so a new stream will be created. |
| o.output( |
| KV.of(failedContext.streamName, new Operation(failedContext.offset - 1, true))); |
| // Reinitialize all contexts with the new stream and new offsets. |
| initializeContexts.accept(failedContexts, true); |
| |
| // Offset failures imply that all subsequent parallel appends will also fail. |
| // Retry them all. |
| return RetryType.RETRY_ALL_OPERATIONS; |
| } |
| |
| return RetryType.RETRY_ALL_OPERATIONS; |
| }; |
| |
| Consumer<AppendRowsContext> onSuccess = |
| context -> { |
| o.output( |
| KV.of( |
| context.streamName, |
| new Operation(context.offset + context.numRows - 1, false))); |
| flushesScheduled.inc(protoRows.getSerializedRowsCount()); |
| }; |
| |
| AppendRowsContext context = new AppendRowsContext(element.getKey()); |
| context.numRows = protoRows.getSerializedRowsCount(); |
| contexts.add(context); |
| retryManager.addOperation(run, onError, onSuccess, context); |
| recordsAppended.inc(protoRows.getSerializedRowsCount()); |
| appendSizeDistribution.update(context.numRows); |
| } |
| initializeContexts.accept(contexts, false); |
| |
| try { |
| retryManager.run(true); |
| } finally { |
| // Make sure that all pins are removed. |
| for (AppendRowsContext context : contexts) { |
| if (context.client != null) { |
| runAsyncIgnoreFailure(closeWriterExecutor, context.client::unpin); |
| } |
| } |
| } |
| appendSplitDistribution.update(numSplits); |
| |
| java.time.Duration timeElapsed = java.time.Duration.between(now, Instant.now()); |
| appendLatencyDistribution.update(timeElapsed.toMillis()); |
| } |
| |
| @OnWindowExpiration |
| public void onWindowExpiration( |
| @AlwaysFetched @StateId("streamName") ValueState<String> streamName, |
| @AlwaysFetched @StateId("streamOffset") ValueState<Long> streamOffset, |
| OutputReceiver<KV<String, Operation>> o) { |
| // Window is done - usually because the pipeline has been drained. Make sure to clean up |
| // streams so that they are not leaked. |
| String stream = MoreObjects.firstNonNull(streamName.read(), null); |
| |
| if (!Strings.isNullOrEmpty(stream)) { |
| // Finalize the stream |
| long nextOffset = MoreObjects.firstNonNull(streamOffset.read(), 0L); |
| o.output(KV.of(stream, new Operation(nextOffset - 1, true))); |
| // Make sure that the stream object is closed. |
| APPEND_CLIENTS.invalidate(stream); |
| } |
| } |
| } |
| } |