blob: 462c720a7da4eaeca0f073492365b94bfc7a4af3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.fromJsonString;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadStream;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A {@link org.apache.beam.sdk.io.Source} representing a single stream in a read session. */
@SuppressWarnings({
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
class BigQueryStorageStreamSource<T> extends BoundedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryStorageStreamSource.class);
public static <T> BigQueryStorageStreamSource<T> create(
ReadSession readSession,
ReadStream readStream,
TableSchema tableSchema,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
return new BigQueryStorageStreamSource<>(
readSession,
readStream,
toJsonString(checkNotNull(tableSchema, "tableSchema")),
parseFn,
outputCoder,
bqServices);
}
/**
* Creates a new source with the same properties as this one, except with a different {@link
* ReadStream}.
*/
public BigQueryStorageStreamSource<T> fromExisting(ReadStream newReadStream) {
return new BigQueryStorageStreamSource<>(
readSession, newReadStream, jsonTableSchema, parseFn, outputCoder, bqServices);
}
private final ReadSession readSession;
private final ReadStream readStream;
private final String jsonTableSchema;
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final Coder<T> outputCoder;
private final BigQueryServices bqServices;
private BigQueryStorageStreamSource(
ReadSession readSession,
ReadStream readStream,
String jsonTableSchema,
SerializableFunction<SchemaAndRecord, T> parseFn,
Coder<T> outputCoder,
BigQueryServices bqServices) {
this.readSession = checkNotNull(readSession, "readSession");
this.readStream = checkNotNull(readStream, "stream");
this.jsonTableSchema = checkNotNull(jsonTableSchema, "jsonTableSchema");
this.parseFn = checkNotNull(parseFn, "parseFn");
this.outputCoder = checkNotNull(outputCoder, "outputCoder");
this.bqServices = checkNotNull(bqServices, "bqServices");
}
@Override
public Coder<T> getOutputCoder() {
return outputCoder;
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
.add(DisplayData.item("table", readSession.getTable()).withLabel("Table"))
.add(DisplayData.item("readSession", readSession.getName()).withLabel("Read session"))
.add(DisplayData.item("stream", readStream.getName()).withLabel("Stream"));
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options) {
// The size of stream source can't be estimated due to server-side liquid sharding.
// TODO: Implement progress reporting.
return 0L;
}
@Override
public List<? extends BoundedSource<T>> split(
long desiredBundleSizeBytes, PipelineOptions options) {
// A stream source can't be split without reading from it due to server-side liquid sharding.
// TODO: Implement dynamic work rebalancing.
return ImmutableList.of(this);
}
@Override
public BigQueryStorageStreamReader<T> createReader(PipelineOptions options) throws IOException {
return new BigQueryStorageStreamReader<>(this, options.as(BigQueryOptions.class));
}
@Override
public String toString() {
return readStream.toString();
}
/** A {@link org.apache.beam.sdk.io.Source.Reader} which reads records from a stream. */
public static class BigQueryStorageStreamReader<T> extends BoundedSource.BoundedReader<T> {
private final BigQueryStorageReader reader;
private final SerializableFunction<SchemaAndRecord, T> parseFn;
private final StorageClient storageClient;
private final TableSchema tableSchema;
private BigQueryStorageStreamSource<T> source;
private BigQueryServerStream<ReadRowsResponse> responseStream;
private Iterator<ReadRowsResponse> responseIterator;
private T current;
private long currentOffset;
// Values used for progress reporting.
private boolean splitPossible = true;
private double fractionConsumed;
private double progressAtResponseStart;
private double progressAtResponseEnd;
private long rowsConsumedFromCurrentResponse;
private long totalRowsInCurrentResponse;
private TableReference tableReference;
private ServiceCallMetric serviceCallMetric;
private BigQueryStorageStreamReader(
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
this.source = source;
this.reader = BigQueryStorageReaderFactory.getReader(source.readSession);
this.parseFn = source.parseFn;
this.storageClient = source.bqServices.getStorageClient(options);
this.tableSchema = fromJsonString(source.jsonTableSchema, TableSchema.class);
this.fractionConsumed = 0d;
this.progressAtResponseStart = 0d;
this.progressAtResponseEnd = 0d;
this.rowsConsumedFromCurrentResponse = 0L;
this.totalRowsInCurrentResponse = 0L;
}
@Override
public synchronized boolean start() throws IOException {
BigQueryStorageStreamSource<T> source = getCurrentSource();
ReadRowsRequest request =
ReadRowsRequest.newBuilder()
.setReadStream(source.readStream.getName())
.setOffset(currentOffset)
.build();
tableReference = BigQueryUtils.toTableReference(source.readSession.getTable());
serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
responseStream = storageClient.readRows(request, source.readSession.getTable());
responseIterator = responseStream.iterator();
LOG.info("Started BigQuery Storage API read from stream {}.", source.readStream.getName());
return readNextRecord();
}
@Override
public synchronized boolean advance() throws IOException {
currentOffset++;
return readNextRecord();
}
private synchronized boolean readNextRecord() throws IOException {
while (reader.readyForNextReadResponse()) {
if (!responseIterator.hasNext()) {
fractionConsumed = 1d;
return false;
}
ReadRowsResponse response;
try {
response = responseIterator.next();
// Since we don't have a direct hook to the underlying
// API call, record success ever time we read a record successfully.
if (serviceCallMetric != null) {
serviceCallMetric.call("ok");
}
} catch (ApiException e) {
// Occasionally the iterator will fail and raise an exception.
// Capture it here and record the error in the metric.
if (serviceCallMetric != null) {
serviceCallMetric.call(e.getStatusCode().getCode().name());
}
throw e;
}
progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
totalRowsInCurrentResponse = response.getRowCount();
rowsConsumedFromCurrentResponse = 0L;
Preconditions.checkArgument(
totalRowsInCurrentResponse >= 0,
"Row count from current response (%s) must be non-negative.",
totalRowsInCurrentResponse);
Preconditions.checkArgument(
0f <= progressAtResponseStart && progressAtResponseStart <= 1f,
"Progress at response start (%s) is not in the range [0.0, 1.0].",
progressAtResponseStart);
Preconditions.checkArgument(
0f <= progressAtResponseEnd && progressAtResponseEnd <= 1f,
"Progress at response end (%s) is not in the range [0.0, 1.0].",
progressAtResponseEnd);
reader.processReadRowsResponse(response);
}
SchemaAndRecord schemaAndRecord = new SchemaAndRecord(reader.readSingleRecord(), tableSchema);
current = parseFn.apply(schemaAndRecord);
// Updates the fraction consumed value. This value is calculated by interpolating between
// the fraction consumed value from the previous server response (or zero if we're consuming
// the first response) and the fractional value in the current response based on how many of
// the rows in the current response have been consumed.
rowsConsumedFromCurrentResponse++;
fractionConsumed =
progressAtResponseStart
+ (progressAtResponseEnd - progressAtResponseStart)
* rowsConsumedFromCurrentResponse
* 1.0
/ totalRowsInCurrentResponse;
return true;
}
@Override
public T getCurrent() throws NoSuchElementException {
return current;
}
@Override
public synchronized void close() {
storageClient.close();
reader.close();
}
@Override
public synchronized BigQueryStorageStreamSource<T> getCurrentSource() {
return source;
}
@Override
public BoundedSource<T> splitAtFraction(double fraction) {
Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls").inc();
LOG.debug(
"Received BigQuery Storage API split request for stream {} at fraction {}.",
source.readStream.getName(),
fraction);
if (fraction <= 0.0 || fraction >= 1.0) {
LOG.info("BigQuery Storage API does not support splitting at fraction {}", fraction);
return null;
}
if (!splitPossible) {
return null;
}
SplitReadStreamRequest splitRequest =
SplitReadStreamRequest.newBuilder()
.setName(source.readStream.getName())
.setFraction((float) fraction)
.build();
SplitReadStreamResponse splitResponse = storageClient.splitReadStream(splitRequest);
if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) {
// No more splits are possible!
Metrics.counter(
BigQueryStorageStreamReader.class,
"split-at-fraction-calls-failed-due-to-impossible-split-point")
.inc();
LOG.info(
"BigQuery Storage API stream {} cannot be split at {}.",
source.readStream.getName(),
fraction);
splitPossible = false;
return null;
}
// We may be able to split this source. Before continuing, we pause the reader thread and
// replace its current source with the primary stream iff the reader has not moved past
// the split point.
synchronized (this) {
BigQueryServerStream<ReadRowsResponse> newResponseStream;
Iterator<ReadRowsResponse> newResponseIterator;
try {
newResponseStream =
storageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadStream(splitResponse.getPrimaryStream().getName())
.setOffset(currentOffset + 1)
.build(),
source.readSession.getTable());
newResponseIterator = newResponseStream.iterator();
newResponseIterator.hasNext();
} catch (FailedPreconditionException e) {
// The current source has already moved past the split point, so this split attempt
// is unsuccessful.
Metrics.counter(
BigQueryStorageStreamReader.class,
"split-at-fraction-calls-failed-due-to-bad-split-point")
.inc();
LOG.info(
"BigQuery Storage API split of stream {} abandoned because the primary stream is to "
+ "the left of the split fraction {}.",
source.readStream.getName(),
fraction);
return null;
} catch (Exception e) {
Metrics.counter(
BigQueryStorageStreamReader.class,
"split-at-fraction-calls-failed-due-to-other-reasons")
.inc();
LOG.error("BigQuery Storage API stream split failed.", e);
return null;
}
// Cancels the parent stream before replacing it with the primary stream.
responseStream.cancel();
source = source.fromExisting(splitResponse.getPrimaryStream());
responseStream = newResponseStream;
responseIterator = newResponseIterator;
reader.resetBuffer();
}
Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls-successful")
.inc();
LOG.info(
"Successfully split BigQuery Storage API stream at {}. Split response: {}",
fraction,
splitResponse);
return source.fromExisting(splitResponse.getRemainderStream());
}
@Override
public synchronized Double getFractionConsumed() {
return fractionConsumed;
}
}
}