Merge pull request #15117 from ajamato/bq_java_read_metrics
[BEAM-11994] Update BigQueryStorageStreamSource and BigQueryServicesImpl to capture API_REQUEST_COUNT metrics/errors for storage API reads
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
index f7b95e7..29d5c00 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServices.java
@@ -231,14 +231,23 @@
/** An interface representing a client object for making calls to the BigQuery Storage API. */
interface StorageClient extends AutoCloseable {
- /** Create a new read session against an existing table. */
+ /**
+ * Create a new read session against an existing table. This method variant collects request
+ * count metric, table id in the request.
+ */
ReadSession createReadSession(CreateReadSessionRequest request);
/** Read rows in the context of a specific read stream. */
BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request);
+ /* This method variant collects request count metric, using the fullTableID metadata. */
+ BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request, String fullTableId);
+
SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request);
+ /* This method variant collects request count metric, using the fullTableID metadata. */
+ SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request, String fullTableId);
+
/**
* Close the client object.
*
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
index d788bfe..951f1fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java
@@ -29,6 +29,7 @@
import com.google.api.client.util.Sleeper;
import com.google.api.core.ApiFuture;
import com.google.api.gax.core.FixedCredentialsProvider;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.ServerStream;
@@ -100,7 +101,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer;
@@ -843,22 +843,7 @@
idsToPublish = insertIdList;
}
- HashMap<String, String> baseLabels = new HashMap<String, String>();
- // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
- // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
- baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
- baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
- baseLabels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
- baseLabels.put(
- MonitoringInfoConstants.Labels.RESOURCE,
- GcpResourceIdentifiers.bigQueryTable(
- ref.getProjectId(), ref.getDatasetId(), ref.getTableId()));
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, ref.getProjectId());
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_DATASET, ref.getDatasetId());
- baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, ref.getTableId());
-
- ServiceCallMetric serviceCallMetric =
- new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.writeCallMetric(ref);
while (true) {
List<FailsafeValueInSingleWindow<TableRow, TableRow>> retryRows = new ArrayList<>();
@@ -1363,9 +1348,31 @@
this.client = BigQueryReadClient.create(settingsBuilder.build());
}
+ // Since BigQueryReadClient client's methods are final they cannot be mocked with Mockito for
+ // testing
+ // So this wrapper method can be mocked in tests, instead.
+ ReadSession callCreateReadSession(CreateReadSessionRequest request) {
+ return client.createReadSession(request);
+ }
+
@Override
public ReadSession createReadSession(CreateReadSessionRequest request) {
- return client.createReadSession(request);
+ TableReference tableReference =
+ BigQueryUtils.toTableReference(request.getReadSession().getTable());
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+ try {
+ ReadSession session = callCreateReadSession(request);
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ return session;
+
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
}
@Override
@@ -1374,11 +1381,48 @@
}
@Override
+ public BigQueryServerStream<ReadRowsResponse> readRows(
+ ReadRowsRequest request, String fullTableId) {
+ TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+ try {
+ BigQueryServerStream<ReadRowsResponse> response = readRows(request);
+ serviceCallMetric.call("ok");
+ return response;
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) {
return client.splitReadStream(request);
}
@Override
+ public SplitReadStreamResponse splitReadStream(
+ SplitReadStreamRequest request, String fullTableId) {
+ TableReference tableReference = BigQueryUtils.toTableReference(fullTableId);
+ ServiceCallMetric serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+ try {
+ SplitReadStreamResponse response = splitReadStream(request);
+
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ return response;
+ } catch (ApiException e) {
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+ }
+
+ @Override
public void close() {
client.close();
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
index 712456a..120adc1 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageQuerySource.java
@@ -175,4 +175,9 @@
kmsKey);
return bqServices.getDatasetService(options).getTable(queryResultTable);
}
+
+ @Override
+ protected @Nullable String getTargetTableId(BigQueryOptions options) throws Exception {
+ return null;
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
index ac8f2ae..504ec9e 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageSourceBase.java
@@ -94,6 +94,8 @@
*/
protected abstract Table getTargetTable(BigQueryOptions options) throws Exception;
+ protected abstract @Nullable String getTargetTableId(BigQueryOptions options) throws Exception;
+
@Override
public Coder<T> getOutputCoder() {
return outputCoder;
@@ -105,9 +107,16 @@
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
Table targetTable = getTargetTable(bqOptions);
- ReadSession.Builder readSessionBuilder =
- ReadSession.newBuilder()
- .setTable(BigQueryHelpers.toTableResourceName(targetTable.getTableReference()));
+ String tableReferenceId = "";
+ if (targetTable != null) {
+ tableReferenceId = BigQueryHelpers.toTableResourceName(targetTable.getTableReference());
+ } else {
+ // If the table does not exist targetTable will be null.
+ // Construct the table id if we can generate it. For error recording/logging.
+ tableReferenceId = getTargetTableId(bqOptions);
+ }
+
+ ReadSession.Builder readSessionBuilder = ReadSession.newBuilder().setTable(tableReferenceId);
if (selectedFieldsProvider != null || rowRestrictionProvider != null) {
ReadSession.TableReadOptions.Builder tableReadOptionsBuilder =
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
index 1a7bc59..462c720 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
@@ -21,7 +21,9 @@
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.toJsonString;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
+import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.FailedPreconditionException;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
@@ -33,6 +35,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.BigQueryServerStream;
@@ -162,6 +165,9 @@
private long rowsConsumedFromCurrentResponse;
private long totalRowsInCurrentResponse;
+ private TableReference tableReference;
+ private ServiceCallMetric serviceCallMetric;
+
private BigQueryStorageStreamReader(
BigQueryStorageStreamSource<T> source, BigQueryOptions options) throws IOException {
this.source = source;
@@ -186,7 +192,9 @@
.setOffset(currentOffset)
.build();
- responseStream = storageClient.readRows(request);
+ tableReference = BigQueryUtils.toTableReference(source.readSession.getTable());
+ serviceCallMetric = BigQueryUtils.readCallMetric(tableReference);
+ responseStream = storageClient.readRows(request, source.readSession.getTable());
responseIterator = responseStream.iterator();
LOG.info("Started BigQuery Storage API read from stream {}.", source.readStream.getName());
return readNextRecord();
@@ -205,7 +213,23 @@
return false;
}
- ReadRowsResponse response = responseIterator.next();
+ ReadRowsResponse response;
+ try {
+ response = responseIterator.next();
+ // Since we don't have a direct hook to the underlying
+ // API call, record success ever time we read a record successfully.
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call("ok");
+ }
+ } catch (ApiException e) {
+ // Occasionally the iterator will fail and raise an exception.
+ // Capture it here and record the error in the metric.
+ if (serviceCallMetric != null) {
+ serviceCallMetric.call(e.getStatusCode().getCode().name());
+ }
+ throw e;
+ }
+
progressAtResponseStart = response.getStats().getProgress().getAtResponseStart();
progressAtResponseEnd = response.getStats().getProgress().getAtResponseEnd();
totalRowsInCurrentResponse = response.getRowCount();
@@ -315,7 +339,8 @@
ReadRowsRequest.newBuilder()
.setReadStream(splitResponse.getPrimaryStream().getName())
.setOffset(currentOffset + 1)
- .build());
+ .build(),
+ source.readSession.getTable());
newResponseIterator = newResponseStream.iterator();
newResponseIterator.hasNext();
} catch (FailedPreconditionException e) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
index 39ee5fb..2a14cd5 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageTableSource.java
@@ -102,11 +102,39 @@
@Override
public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
- return getTargetTable(options.as(BigQueryOptions.class)).getNumBytes();
+ Table table = getTargetTable(options.as(BigQueryOptions.class));
+ if (table != null) {
+ return table.getNumBytes();
+ }
+ // If the table does not exist, then it will be null.
+ // Avoid the NullPointerException here, allow a more meaningful table "not_found"
+ // error to be shown to the user, upon table read.
+ return 0;
}
@Override
- protected Table getTargetTable(BigQueryOptions options) throws Exception {
+ protected String getTargetTableId(BigQueryOptions options) throws Exception {
+ TableReference tableReference = tableReferenceProvider.get();
+ if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
+ checkState(
+ !Strings.isNullOrEmpty(options.getProject()),
+ "No project ID set in %s or %s, cannot construct a complete %s",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName(),
+ TableReference.class.getSimpleName());
+ LOG.info(
+ "Project ID not set in {}. Using default project from {}.",
+ TableReference.class.getSimpleName(),
+ BigQueryOptions.class.getSimpleName());
+ tableReference.setProjectId(options.getProject());
+ }
+ return String.format(
+ "projects/%s/datasets/%s/tables/%s",
+ tableReference.getProjectId(), tableReference.getDatasetId(), tableReference.getTableId());
+ }
+
+ @Override
+ protected @Nullable Table getTargetTable(BigQueryOptions options) throws Exception {
if (cachedTable.get() == null) {
TableReference tableReference = tableReferenceProvider.get();
if (Strings.isNullOrEmpty(tableReference.getProjectId())) {
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
index 9166228..6025b22 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtils.java
@@ -24,6 +24,7 @@
import static org.apache.beam.sdk.values.Row.toRow;
import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auto.value.AutoValue;
@@ -34,17 +35,23 @@
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
+import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
+import org.apache.beam.runners.core.metrics.MonitoringInfoConstants;
+import org.apache.beam.runners.core.metrics.ServiceCallMetric;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.Schema;
@@ -75,6 +82,20 @@
"nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
})
public class BigQueryUtils {
+
+ // For parsing the format returned on the API proto:
+ // google.cloud.bigquery.storage.v1.ReadSession.getTable()
+ // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+ private static final Pattern TABLE_RESOURCE_PATTERN =
+ Pattern.compile(
+ "^projects/(?<PROJECT>[^/]+)/datasets/(?<DATASET>[^/]+)/tables/(?<TABLE>[^/]+)$");
+
+ // For parsing the format used to refer to tables parameters in BigQueryIO.
+ // "{project_id}:{dataset_id}.{table_id}" or
+ // "{project_id}.{dataset_id}.{table_id}"
+ private static final Pattern SIMPLE_TABLE_PATTERN =
+ Pattern.compile("^(?<PROJECT>[^\\.:]+)[\\.:](?<DATASET>[^\\.:]+)[\\.](?<TABLE>[^\\.:]+)$");
+
/** Options for how to convert BigQuery data to Beam data. */
@AutoValue
public abstract static class ConversionOptions implements Serializable {
@@ -909,4 +930,85 @@
"Does not support converting avro format: " + value.getClass().getName());
}
}
+
+ /**
+ * @param fullTableId - Is one of the two forms commonly used to refer to bigquery tables in the
+ * beam codebase:
+ * <ul>
+ * <li>projects/{project_id}/datasets/{dataset_id}/tables/{table_id}
+ * <li>myproject:mydataset.mytable
+ * <li>myproject.mydataset.mytable
+ * </ul>
+ *
+ * @return a BigQueryTableIdentifier by parsing the fullTableId. If it cannot be parsed properly
+ * null is returned.
+ */
+ public static @Nullable TableReference toTableReference(String fullTableId) {
+ // Try parsing the format:
+ // "projects/{project_id}/datasets/{dataset_id}/tables/{table_id}"
+ Matcher m = TABLE_RESOURCE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+
+ // If that failed, try the format:
+ // "{project_id}:{dataset_id}.{table_id}" or
+ // "{project_id}.{dataset_id}.{table_id}"
+ m = SIMPLE_TABLE_PATTERN.matcher(fullTableId);
+ if (m.matches()) {
+ return new TableReference()
+ .setProjectId(m.group("PROJECT"))
+ .setDatasetId(m.group("DATASET"))
+ .setTableId(m.group("TABLE"));
+ }
+ return null;
+ }
+
+ private static ServiceCallMetric callMetricForMethod(
+ TableReference tableReference, String method) {
+ if (tableReference != null) {
+ // TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
+ // SpecMonitoringInfoValidator from dropping the MonitoringInfo.
+ HashMap<String, String> baseLabels = new HashMap<String, String>();
+ baseLabels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
+ baseLabels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
+ baseLabels.put(MonitoringInfoConstants.Labels.METHOD, method);
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.RESOURCE,
+ GcpResourceIdentifiers.bigQueryTable(
+ tableReference.getProjectId(),
+ tableReference.getDatasetId(),
+ tableReference.getTableId()));
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_PROJECT_ID, tableReference.getProjectId());
+ baseLabels.put(
+ MonitoringInfoConstants.Labels.BIGQUERY_DATASET, tableReference.getDatasetId());
+ baseLabels.put(MonitoringInfoConstants.Labels.BIGQUERY_TABLE, tableReference.getTableId());
+ return new ServiceCallMetric(MonitoringInfoConstants.Urns.API_REQUEST_COUNT, baseLabels);
+ }
+ return null;
+ }
+
+ /**
+ * @param tableReference - The table being read from. Can be a temporary BQ table used to read
+ * from a SQL query.
+ * @return a ServiceCallMetric for recording statuses for all BQ API responses related to reading
+ * elements directly from BigQuery in a process-wide metric. Such as: calls to readRows,
+ * splitReadStream, createReadSession.
+ */
+ public static ServiceCallMetric readCallMetric(TableReference tableReference) {
+ return callMetricForMethod(tableReference, "BigQueryBatchRead");
+ }
+
+ /**
+ * @param tableReference - The table being written to.
+ * @return a ServiceCallMetric for recording statuses for all BQ responses related to writing
+ * elements directly to BigQuery in a process-wide metric. Such as: insertAll.
+ */
+ public static ServiceCallMetric writeCallMetric(TableReference tableReference) {
+ return callMetricForMethod(tableReference, "BigQueryBatchWrite");
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
index 81e1027..86f538a 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageQueryTest.java
@@ -821,7 +821,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
when(fakeStorageClient.createReadSession(any())).thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
BigQueryIO.TypedRead<KV<String, Long>> typedRead =
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
index c39cd11..135a536 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOStorageReadTest.java
@@ -774,7 +774,7 @@
createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -830,7 +830,7 @@
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -899,7 +899,7 @@
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.800, 0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
when(fakeStorageClient.splitReadStream(
@@ -916,7 +916,7 @@
createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.8, 1.0));
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+ ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -974,7 +974,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call.
@@ -993,11 +993,12 @@
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2)));
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
.thenReturn(
new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size())));
@@ -1051,7 +1052,7 @@
// Mock the initial ReadRows call.
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+ ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), ""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1091,7 +1092,8 @@
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(1).getName())
.setOffset(1)
- .build()))
+ .build(),
+ ""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1125,7 +1127,8 @@
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(2).getName())
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
@@ -1191,7 +1194,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1259,7 +1262,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1280,7 +1283,8 @@
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenThrow(
new FailedPreconditionException(
"Given row offset is invalid for stream.",
@@ -1373,7 +1377,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<KV<String, Long>> output =
@@ -1439,7 +1443,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<TableRow> output =
@@ -1507,7 +1511,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
- when(fakeStorageClient.readRows(expectedReadRowsRequest))
+ when(fakeStorageClient.readRows(expectedReadRowsRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<KV<String, Long>> output =
@@ -1553,7 +1557,7 @@
ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5, 0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1602,7 +1606,7 @@
createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 1.0));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1668,7 +1672,7 @@
ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
- when(fakeStorageClient.readRows(expectedRequest))
+ when(fakeStorageClient.readRows(expectedRequest, ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponse));
when(fakeStorageClient.splitReadStream(
@@ -1685,7 +1689,7 @@
createResponseArrow(ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.8, 1.0));
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build()))
+ ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
BigQueryStorageStreamSource<TableRow> streamSource =
@@ -1734,7 +1738,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call.
@@ -1753,11 +1757,12 @@
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2)));
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("remainderStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), ""))
.thenReturn(
new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size())));
@@ -1824,7 +1829,7 @@
// Mock the initial ReadRows call.
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build()))
+ ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mock the first SplitReadStream call.
@@ -1850,7 +1855,8 @@
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(1).getName())
.setOffset(1)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(otherResponses));
// Mock the second SplitReadStream call.
@@ -1875,7 +1881,8 @@
ReadRowsRequest.newBuilder()
.setReadStream(readStreams.get(2).getName())
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenReturn(new FakeBigQueryServerStream<>(lastResponses));
BoundedSource<TableRow> source =
@@ -1929,7 +1936,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -1994,7 +2001,7 @@
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
- ReadRowsRequest.newBuilder().setReadStream("parentStream").build()))
+ ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), ""))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
@@ -2015,7 +2022,8 @@
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2)
- .build()))
+ .build(),
+ ""))
.thenThrow(
new FailedPreconditionException(
"Given row offset is invalid for stream.",
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
index 61b94f6..53d8db0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java
@@ -27,6 +27,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -48,6 +49,8 @@
import com.google.api.client.testing.util.MockSleeper;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
+import com.google.api.gax.rpc.ApiException;
+import com.google.api.gax.rpc.StatusCode;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
@@ -62,6 +65,12 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
+import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
+import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
+import com.google.cloud.bigquery.storage.v1.ReadSession;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest;
+import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import java.io.ByteArrayInputStream;
@@ -69,6 +78,7 @@
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
@@ -179,15 +189,15 @@
}
}
- private void verifyWriteMetricWasSet(
- String projectId, String dataset, String table, String status, long count) {
+ private void verifyRequestMetricWasSet(
+ String method, String projectId, String dataset, String table, String status, long count) {
// Verify the metric as reported.
HashMap<String, String> labels = new HashMap<String, String>();
// TODO(ajamato): Add Ptransform label. Populate it as empty for now to prevent the
// SpecMonitoringInfoValidator from dropping the MonitoringInfo.
labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
labels.put(MonitoringInfoConstants.Labels.SERVICE, "BigQuery");
- labels.put(MonitoringInfoConstants.Labels.METHOD, "BigQueryBatchWrite");
+ labels.put(MonitoringInfoConstants.Labels.METHOD, method);
labels.put(
MonitoringInfoConstants.Labels.RESOURCE,
GcpResourceIdentifiers.bigQueryTable(projectId, dataset, table));
@@ -203,6 +213,16 @@
assertEquals(count, (long) container.getCounter(name).getCumulative());
}
+ private void verifyWriteMetricWasSet(
+ String projectId, String dataset, String table, String status, long count) {
+ verifyRequestMetricWasSet("BigQueryBatchWrite", projectId, dataset, table, status, count);
+ }
+
+ private void verifyReadMetricWasSet(
+ String projectId, String dataset, String table, String status, long count) {
+ verifyRequestMetricWasSet("BigQueryBatchRead", projectId, dataset, table, status, count);
+ }
+
/** Tests that {@link BigQueryServicesImpl.JobServiceImpl#startLoadJob} succeeds. */
@Test
public void testStartLoadJobSucceeds() throws IOException, InterruptedException {
@@ -1399,4 +1419,145 @@
assertThat(failedInserts, is(expected));
}
+
+ @Test
+ public void testCreateReadSessionSetsRequestCountMetric()
+ throws InterruptedException, IOException {
+ BigQueryServicesImpl.StorageClientImpl client =
+ mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder();
+ builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+ CreateReadSessionRequest request = builder.build();
+ when(client.callCreateReadSession(request))
+ .thenReturn(ReadSession.newBuilder().build()); // Mock implementation.
+ when(client.createReadSession(any())).thenCallRealMethod(); // Real implementation.
+
+ client.createReadSession(request);
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testCreateReadSessionSetsRequestCountMetricOnError()
+ throws InterruptedException, IOException {
+ BigQueryServicesImpl.StorageClientImpl client =
+ mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ CreateReadSessionRequest.Builder builder = CreateReadSessionRequest.newBuilder();
+ builder.getReadSessionBuilder().setTable("myproject:mydataset.mytable");
+ CreateReadSessionRequest request = builder.build();
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.NOT_FOUND;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.callCreateReadSession(request))
+ .thenThrow(new ApiException("Not Found", null, statusCode, false)); // Mock implementation.
+ when(client.createReadSession(any())).thenCallRealMethod(); // Real implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Not Found");
+
+ client.createReadSession(request);
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "not_found", 1);
+ }
+
+ @Test
+ public void testReadRowsSetsRequestCountMetric() throws InterruptedException, IOException {
+ BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+ ReadRowsRequest request = null;
+ BigQueryServices.BigQueryServerStream<ReadRowsResponse> response =
+ new BigQueryServices.BigQueryServerStream<ReadRowsResponse>() {
+ @Override
+ public Iterator<ReadRowsResponse> iterator() {
+ return null;
+ }
+
+ @Override
+ public void cancel() {}
+ };
+
+ when(client.readRows(request)).thenReturn(response); // Mock implementation.
+ when(client.readRows(any(), any())).thenCallRealMethod(); // Real implementation.
+
+ client.readRows(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testReadRowsSetsRequestCountMetricOnError() throws InterruptedException, IOException {
+ BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+ ReadRowsRequest request = null;
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.INTERNAL;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.readRows(request))
+ .thenThrow(new ApiException("Internal", null, statusCode, false)); // Mock implementation.
+ when(client.readRows(any(), any())).thenCallRealMethod(); // Real implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Internal");
+
+ client.readRows(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "internal", 1);
+ }
+
+ @Test
+ public void testSplitReadStreamSetsRequestCountMetric() throws InterruptedException, IOException {
+ BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+
+ SplitReadStreamRequest request = null;
+ when(client.splitReadStream(request))
+ .thenReturn(SplitReadStreamResponse.newBuilder().build()); // Mock implementation.
+ when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real implementation.
+
+ client.splitReadStream(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "ok", 1);
+ }
+
+ @Test
+ public void testSplitReadStreamSetsRequestCountMetricOnError()
+ throws InterruptedException, IOException {
+ BigQueryServices.StorageClient client = mock(BigQueryServicesImpl.StorageClientImpl.class);
+ SplitReadStreamRequest request = null;
+ StatusCode statusCode =
+ new StatusCode() {
+ @Override
+ public Code getCode() {
+ return Code.RESOURCE_EXHAUSTED;
+ }
+
+ @Override
+ public Object getTransportCode() {
+ return null;
+ }
+ };
+ when(client.splitReadStream(request))
+ .thenThrow(
+ new ApiException(
+ "Resource Exhausted", null, statusCode, false)); // Mock implementation.
+ when(client.splitReadStream(any(), any())).thenCallRealMethod(); // Real implementation.
+
+ thrown.expect(ApiException.class);
+ thrown.expectMessage("Resource Exhausted");
+
+ client.splitReadStream(request, "myproject:mydataset.mytable");
+ verifyReadMetricWasSet("myproject", "mydataset", "mytable", "resource_exhausted", 1);
+ }
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
index 75c1315..0782622 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryUtilsTest.java
@@ -26,9 +26,11 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.collection.IsMapContaining.hasEntry;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.math.BigDecimal;
@@ -882,4 +884,82 @@
record, AVRO_ARRAY_ARRAY_TYPE, BigQueryUtils.ConversionOptions.builder().build());
assertEquals(expected, beamRow);
}
+
+ @Test
+ public void testToTableReference() {
+ {
+ TableReference tr =
+ BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable");
+ assertEquals("myproject", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ {
+ // Test colon(":") after project format
+ TableReference tr = BigQueryUtils.toTableReference("myprojectwithcolon:mydataset.mytable");
+ assertEquals("myprojectwithcolon", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ {
+ // Test dot(".") after project format
+ TableReference tr = BigQueryUtils.toTableReference("myprojectwithdot.mydataset.mytable");
+ assertEquals("myprojectwithdot", tr.getProjectId());
+ assertEquals("mydataset", tr.getDatasetId());
+ assertEquals("mytable", tr.getTableId());
+ }
+
+ // Invalid scenarios
+ assertNull(BigQueryUtils.toTableReference(""));
+ assertNull(BigQueryUtils.toTableReference(":."));
+ assertNull(BigQueryUtils.toTableReference(".."));
+ assertNull(BigQueryUtils.toTableReference("myproject"));
+ assertNull(BigQueryUtils.toTableReference("myproject:"));
+ assertNull(BigQueryUtils.toTableReference("myproject."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset"));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable."));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset:mytable:"));
+ assertNull(BigQueryUtils.toTableReference(".invalidleadingdot.mydataset.mytable"));
+ assertNull(BigQueryUtils.toTableReference("invalidtrailingdot.mydataset.mytable."));
+ assertNull(BigQueryUtils.toTableReference(":invalidleadingcolon.mydataset.mytable"));
+ assertNull(BigQueryUtils.toTableReference("invalidtrailingcolon.mydataset.mytable:"));
+ assertNull(BigQueryUtils.toTableReference("myproject.mydataset.mytable.myinvalidpart"));
+ assertNull(BigQueryUtils.toTableReference("myproject:mydataset.mytable.myinvalidpart"));
+
+ assertNull(
+ BigQueryUtils.toTableReference("/projects/extraslash/datasets/mydataset/tables/mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects//extraslash/datasets/mydataset/tables/mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects/extraslash//datasets/mydataset/tables/mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects/extraslash/datasets//mydataset/tables/mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset//tables/mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables//mytable"));
+ assertNull(
+ BigQueryUtils.toTableReference("projects/extraslash/datasets/mydataset/tables/mytable/"));
+
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables//"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets//tables/mytable/"));
+ assertNull(BigQueryUtils.toTableReference("projects//datasets/mydataset/tables/mytable/"));
+ assertNull(BigQueryUtils.toTableReference("projects//datasets//tables//"));
+
+ assertNull(
+ BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/mytable/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/tables"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/mydataset"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/datasets"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject/"));
+ assertNull(BigQueryUtils.toTableReference("projects/myproject"));
+ assertNull(BigQueryUtils.toTableReference("projects/"));
+ assertNull(BigQueryUtils.toTableReference("projects"));
+ }
}