| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigquery; |
| |
| import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString; |
| import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString; |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.api.gax.rpc.FailedPreconditionException; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamRequest; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition; |
| import com.google.protobuf.UnknownFieldSet; |
| import java.io.IOException; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericDatumReader; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.io.BinaryDecoder; |
| import org.apache.avro.io.DatumReader; |
| import org.apache.avro.io.DecoderFactory; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; |
| import org.apache.beam.sdk.metrics.Metrics; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** A {@link org.apache.beam.sdk.io.Source} representing a single stream in a read session. */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public class BigQueryStorageStreamSource<T> extends BoundedSource<T> { |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryStorageStreamSource.class); |
| |
| public static <T> BigQueryStorageStreamSource<T> create( |
| ReadSession readSession, |
| Stream stream, |
| TableSchema tableSchema, |
| SerializableFunction<SchemaAndRecord, T> parseFn, |
| Coder<T> outputCoder, |
| BigQueryServices bqServices) { |
| return new BigQueryStorageStreamSource<>( |
| readSession, |
| stream, |
| toJsonString(checkNotNull(tableSchema, "tableSchema")), |
| parseFn, |
| outputCoder, |
| bqServices); |
| } |
| |
| /** |
| * Creates a new source with the same properties as this one, except with a different {@link |
| * Stream}. |
| */ |
| public BigQueryStorageStreamSource<T> fromExisting(Stream newStream) { |
| return new BigQueryStorageStreamSource( |
| readSession, newStream, jsonTableSchema, parseFn, outputCoder, bqServices); |
| } |
| |
| private final ReadSession readSession; |
| private final Stream stream; |
| private final String jsonTableSchema; |
| private final SerializableFunction<SchemaAndRecord, T> parseFn; |
| private final Coder<T> outputCoder; |
| private final BigQueryServices bqServices; |
| |
| private BigQueryStorageStreamSource( |
| ReadSession readSession, |
| Stream stream, |
| String jsonTableSchema, |
| SerializableFunction<SchemaAndRecord, T> parseFn, |
| Coder<T> outputCoder, |
| BigQueryServices bqServices) { |
| this.readSession = checkNotNull(readSession, "readSession"); |
| this.stream = checkNotNull(stream, "stream"); |
| this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema"); |
| this.parseFn = checkNotNull(parseFn, "parseFn"); |
| this.outputCoder = checkNotNull(outputCoder, "outputCoder"); |
| this.bqServices = checkNotNull(bqServices, "bqServices"); |
| } |
| |
| @Override |
| public Coder<T> getOutputCoder() { |
| return outputCoder; |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| builder |
| .addIfNotNull( |
| DisplayData.item("table", BigQueryHelpers.toTableSpec(readSession.getTableReference())) |
| .withLabel("Table")) |
| .add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session")) |
| .add(DisplayData.item("stream", stream.getName()).withLabel("Stream")); |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(PipelineOptions options) { |
| // The size of stream source can't be estimated due to server-side liquid sharding. |
| // TODO: Implement progress reporting. |
| return 0L; |
| } |
| |
| @Override |
| public List<? extends BoundedSource<T>> split( |
| long desiredBundleSizeBytes, PipelineOptions options) { |
| // A stream source can't be split without reading from it due to server-side liquid sharding. |
| // TODO: Implement dynamic work rebalancing. |
| return ImmutableList.of(this); |
| } |
| |
| @Override |
| public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException { |
| return new BigQueryStorageStreamReader<>(this, options.as(BigQueryOptions.class)); |
| } |
| |
| @Override |
| public String toString() { |
| return stream.toString(); |
| } |
| |
| /** A {@link org.apache.beam.sdk.io.Source.Reader} which reads records from a stream. */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public static class BigQueryStorageStreamReader<T> extends BoundedSource.BoundedReader<T> { |
| |
| private final DatumReader<GenericRecord> datumReader; |
| private final SerializableFunction<SchemaAndRecord, T> parseFn; |
| private final StorageClient storageClient; |
| private final TableSchema tableSchema; |
| |
| private BigQueryStorageStreamSource<T> source; |
| private BigQueryServerStream<ReadRowsResponse> responseStream; |
| private Iterator<ReadRowsResponse> responseIterator; |
| private BinaryDecoder decoder; |
| private GenericRecord record; |
| private T current; |
| private long currentOffset; |
| |
| // Values used for progress reporting. |
| private double fractionConsumed; |
| private double fractionConsumedFromPreviousResponse; |
| private double fractionConsumedFromCurrentResponse; |
| private long rowsReadFromCurrentResponse; |
| private long totalRowCountFromCurrentResponse; |
| |
| private BigQueryStorageStreamReader( |
| BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException { |
| this.source = source; |
| this.datumReader = |
| new GenericDatumReader<>( |
| new Schema.Parser().parse(source.readSession.getAvroSchema().getSchema())); |
| this.parseFn = source.parseFn; |
| this.storageClient = source.bqServices.getStorageClient(options); |
| this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class); |
| this.fractionConsumed = 0d; |
| this.fractionConsumedFromPreviousResponse = 0d; |
| this.fractionConsumedFromCurrentResponse = 0d; |
| this.rowsReadFromCurrentResponse = 0L; |
| this.totalRowCountFromCurrentResponse = 0L; |
| } |
| |
| @Override |
| public synchronized boolean start() throws IOException { |
| BigQueryStorageStreamSource<T> source = getCurrentSource(); |
| |
| ReadRowsRequest request = |
| ReadRowsRequest.newBuilder() |
| .setReadPosition( |
| StreamPosition.newBuilder().setStream(source.stream).setOffset(currentOffset)) |
| .build(); |
| |
| responseStream = storageClient.readRows(request); |
| responseIterator = responseStream.iterator(); |
| LOGGER.info("Started BigQuery Storage API read from stream {}.", source.stream.getName()); |
| return readNextRecord(); |
| } |
| |
| @Override |
| public synchronized boolean advance() throws IOException { |
| currentOffset++; |
| return readNextRecord(); |
| } |
| |
| private synchronized boolean readNextRecord() throws IOException { |
| while (decoder == null || decoder.isEnd()) { |
| if (!responseIterator.hasNext()) { |
| fractionConsumed = 1d; |
| return false; |
| } |
| |
| fractionConsumedFromPreviousResponse = fractionConsumedFromCurrentResponse; |
| ReadRowsResponse currentResponse = responseIterator.next(); |
| decoder = |
| DecoderFactory.get() |
| .binaryDecoder( |
| currentResponse.getAvroRows().getSerializedBinaryRows().toByteArray(), decoder); |
| |
| // Since we now have a new response, reset the row counter for the current response. |
| rowsReadFromCurrentResponse = 0L; |
| |
| totalRowCountFromCurrentResponse = currentResponse.getAvroRows().getRowCount(); |
| fractionConsumedFromCurrentResponse = getFractionConsumed(currentResponse); |
| |
| Preconditions.checkArgument( |
| totalRowCountFromCurrentResponse > 0L, |
| "Row count from current response (%s) must be greater than one.", |
| totalRowCountFromCurrentResponse); |
| Preconditions.checkArgument( |
| 0f <= fractionConsumedFromCurrentResponse && fractionConsumedFromCurrentResponse <= 1f, |
| "Fraction consumed from current response (%s) is not in the range [0.0, 1.0].", |
| fractionConsumedFromCurrentResponse); |
| Preconditions.checkArgument( |
| fractionConsumedFromPreviousResponse < fractionConsumedFromCurrentResponse, |
| "Fraction consumed from previous response (%s) is not less than fraction consumed " |
| + "from current response (%s).", |
| fractionConsumedFromPreviousResponse, |
| fractionConsumedFromCurrentResponse); |
| } |
| |
| record = datumReader.read(record, decoder); |
| current = parseFn.apply(new SchemaAndRecord(record, tableSchema)); |
| |
| // Updates the fraction consumed value. This value is calculated by interpolating between |
| // the fraction consumed value from the previous server response (or zero if we're consuming |
| // the first response) and the fractional value in the current response based on how many of |
| // the rows in the current response have been consumed. |
| rowsReadFromCurrentResponse++; |
| fractionConsumed = |
| fractionConsumedFromPreviousResponse |
| + (fractionConsumedFromCurrentResponse - fractionConsumedFromPreviousResponse) |
| * rowsReadFromCurrentResponse |
| * 1.0 |
| / totalRowCountFromCurrentResponse; |
| |
| return true; |
| } |
| |
| @Override |
| public T getCurrent() throws NoSuchElementException { |
| return current; |
| } |
| |
| @Override |
| public synchronized void close() { |
| storageClient.close(); |
| } |
| |
| @Override |
| public synchronized BigQueryStorageStreamSource<T> getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public BoundedSource<T> splitAtFraction(double fraction) { |
| Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls").inc(); |
| LOGGER.debug( |
| "Received BigQuery Storage API split request for stream {} at fraction {}.", |
| source.stream.getName(), |
| fraction); |
| |
| SplitReadStreamRequest splitRequest = |
| SplitReadStreamRequest.newBuilder() |
| .setOriginalStream(source.stream) |
| // TODO(aryann): Once we rebuild the generated client code, we should change this to |
| // use setFraction(). |
| .setUnknownFields( |
| UnknownFieldSet.newBuilder() |
| .addField( |
| 2, |
| UnknownFieldSet.Field.newBuilder() |
| .addFixed32(java.lang.Float.floatToIntBits((float) fraction)) |
| .build()) |
| .build()) |
| .build(); |
| SplitReadStreamResponse splitResponse = storageClient.splitReadStream(splitRequest); |
| |
| if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) { |
| // No more splits are possible! |
| Metrics.counter( |
| BigQueryStorageStreamReader.class, |
| "split-at-fraction-calls-failed-due-to-impossible-split-point") |
| .inc(); |
| LOGGER.info( |
| "BigQuery Storage API stream {} cannot be split at {}.", |
| source.stream.getName(), |
| fraction); |
| return null; |
| } |
| |
| // We may be able to split this source. Before continuing, we pause the reader thread and |
| // replace its current source with the primary stream iff the reader has not moved past |
| // the split point. |
| synchronized (this) { |
| BigQueryServerStream<ReadRowsResponse> newResponseStream; |
| Iterator<ReadRowsResponse> newResponseIterator; |
| try { |
| newResponseStream = |
| storageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadPosition( |
| StreamPosition.newBuilder() |
| .setStream(splitResponse.getPrimaryStream()) |
| .setOffset(currentOffset + 1)) |
| .build()); |
| newResponseIterator = newResponseStream.iterator(); |
| newResponseIterator.hasNext(); |
| } catch (FailedPreconditionException e) { |
| // The current source has already moved past the split point, so this split attempt |
| // is unsuccessful. |
| Metrics.counter( |
| BigQueryStorageStreamReader.class, |
| "split-at-fraction-calls-failed-due-to-bad-split-point") |
| .inc(); |
| LOGGER.info( |
| "BigQuery Storage API split of stream {} abandoned because the primary stream is to " |
| + "the left of the split fraction {}.", |
| source.stream.getName(), |
| fraction); |
| return null; |
| } catch (Exception e) { |
| Metrics.counter( |
| BigQueryStorageStreamReader.class, |
| "split-at-fraction-calls-failed-due-to-other-reasons") |
| .inc(); |
| LOGGER.error("BigQuery Storage API stream split failed.", e); |
| return null; |
| } |
| |
| // Cancels the parent stream before replacing it with the primary stream. |
| responseStream.cancel(); |
| source = source.fromExisting(splitResponse.getPrimaryStream()); |
| responseStream = newResponseStream; |
| responseIterator = newResponseIterator; |
| |
| // N.B.: Once #readNextRecord is called, this line has the effect of using the fraction |
| // consumed value at split time as the fraction consumed value of the previous response, |
| // leading to a better interpolation window start. Unfortunately, this is not the best value |
| // as it will lead to a significant speed up in the fraction consumed values while the first |
| // post-split response is being processed. In the future, if the server returns the start |
| // and end fraction consumed values in each response, then these interpolations will be |
| // easier to perform as state from the previous response will not need to be maintained. |
| fractionConsumedFromCurrentResponse = fractionConsumed; |
| |
| decoder = null; |
| } |
| |
| Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful") |
| .inc(); |
| LOGGER.info( |
| "Successfully split BigQuery Storage API stream at {}. Split response: {}", |
| fraction, |
| splitResponse); |
| return source.fromExisting(splitResponse.getRemainderStream()); |
| } |
| |
| @Override |
| public synchronized Double getFractionConsumed() { |
| return fractionConsumed; |
| } |
| |
| private static float getFractionConsumed(ReadRowsResponse response) { |
| // TODO(aryann): Once we rebuild the generated client code, we should change this to |
| // use getFractionConsumed(). |
| List<Integer> fractionConsumedField = |
| response.getStatus().getUnknownFields().getField(2).getFixed32List(); |
| if (fractionConsumedField.isEmpty()) { |
| Metrics.counter(BigQueryStorageStreamReader.class, "fraction-consumed-not-set").inc(); |
| return 0f; |
| } |
| |
| return Float.intBitsToFloat(Iterables.getOnlyElement(fractionConsumedField)); |
| } |
| } |
| } |