blob: f86d7042bb4cdf4d649afaa27ab6005d4a6187fa [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.testing;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ClassInfo;
import com.google.api.client.util.Data;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.DatasetReference;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.QueryResponse;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableList;
import com.google.api.services.bigquery.model.TableList.Tables;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.auth.Credentials;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Random;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A wrapper class to call Bigquery API calls.
*
* <p>Example:
*
* <p>Get a new Bigquery client:
*
* <pre>{@code [
* BigqueryClient client = BigqueryClient.getNewBigquerryClient(applicationName);
* ]}</pre>
*
* <p>Execute a query with retries:
*
* <pre>{@code [
* QueryResponse response = client.queryWithRetries(queryString, projectId);
* ]}</pre>
*
* <p>Create a new dataset in one project:
*
* <pre>{@code [
* client.createNewDataset(projectId, datasetId);
* ]}</pre>
*
* <p>Delete a dataset in one project, included its all tables:
*
* <pre>{@code [
* client.deleteDataset(projectId, datasetId);
* ]}</pre>
*
* <p>Create a new table
*
* <pre>{@code [
* client.createNewTable(projectId, datasetId, newTable)
* ]}</pre>
*
* <p>Insert data into table
*
* <pre>{@code [
* client.insertDataToTable(projectId, datasetId, tableName, rows)
* ]}</pre>
*/
public class BigqueryClient {
private static final Logger LOG = LoggerFactory.getLogger(BigqueryClient.class);
// The maximum number of retries to execute a BigQuery RPC
static final int MAX_QUERY_RETRIES = 4;
// How long should BQ jobs.query call wait for query completion. Default is 10 seconds.
static final Long QUERY_TIMEOUT_MS = 20_000L;
// The initial backoff for executing a BigQuery RPC
private static final Duration INITIAL_BACKOFF = Duration.standardSeconds(1L);
// The backoff factory with initial configs
static final FluentBackoff BACKOFF_FACTORY =
FluentBackoff.DEFAULT.withMaxRetries(MAX_QUERY_RETRIES).withInitialBackoff(INITIAL_BACKOFF);
private static final Collection<String> RESERVED_FIELD_NAMES =
ClassInfo.of(TableRow.class).getNames();
private Bigquery bqClient;
private static Credentials getDefaultCredential() {
GoogleCredentials credential;
try {
credential = GoogleCredentials.getApplicationDefault();
} catch (IOException e) {
throw new RuntimeException("Failed to get application default credential.", e);
}
if (credential.createScopedRequired()) {
Collection<String> bigqueryScope = Lists.newArrayList(BigqueryScopes.all());
credential = credential.createScoped(bigqueryScope);
}
return credential;
}
public static Bigquery getNewBigquerryClient(String applicationName) {
HttpTransport transport = Transport.getTransport();
JsonFactory jsonFactory = Transport.getJsonFactory();
Credentials credential = getDefaultCredential();
return new Bigquery.Builder(transport, jsonFactory, new HttpCredentialsAdapter(credential))
.setApplicationName(applicationName)
.build();
}
public static BigqueryClient getClient(String applicationName) {
return new BigqueryClient(applicationName);
}
public BigqueryClient(String applicationName) {
bqClient = BigqueryClient.getNewBigquerryClient(applicationName);
}
@Nonnull
public QueryResponse queryWithRetries(String query, String projectId)
throws IOException, InterruptedException {
return queryWithRetries(query, projectId, false);
}
@Nullable
private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) {
if (Data.isNull(v)) {
return null;
}
if (Objects.equals(fieldSchema.getMode(), "REPEATED")) {
TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED");
@SuppressWarnings("unchecked")
List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v;
ImmutableList.Builder<Object> values = ImmutableList.builder();
for (Map<String, Object> element : rawCells) {
values.add(getTypedCellValue(elementSchema, element.get("v")));
}
return values.build();
}
if ("RECORD".equals(fieldSchema.getType())) {
@SuppressWarnings("unchecked")
Map<String, Object> typedV = (Map<String, Object>) v;
return getTypedTableRow(fieldSchema.getFields(), typedV);
}
if ("FLOAT".equals(fieldSchema.getType())) {
return Double.parseDouble((String) v);
}
if ("BOOLEAN".equals(fieldSchema.getType())) {
return Boolean.parseBoolean((String) v);
}
if ("TIMESTAMP".equals(fieldSchema.getType())) {
return (String) v;
}
// Returns the original value for:
// 1. String, 2. base64 encoded BYTES, 3. DATE, DATETIME, TIME strings.
return v;
}
private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) {
TableRow row;
List<? extends Map<String, Object>> cells;
if (rawRow instanceof TableRow) {
// Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do
// any type conversion, but extract the cells for cell-wise processing below.
row = (TableRow) rawRow;
cells = row.getF();
// Clear the cells from the row, so that row.getF() will return null. This matches the
// behavior of rows produced by the BigQuery export API used on the service.
row.setF(null);
} else {
row = new TableRow();
// Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to
// get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell,
// we will use Map.get("v") instead of TableCell.getV() get its value.
@SuppressWarnings("unchecked")
List<? extends Map<String, Object>> rawCells =
(List<? extends Map<String, Object>>) rawRow.get("f");
cells = rawCells;
}
checkState(
cells.size() == fields.size(),
"Expected that the row has the same number of cells %s as fields in the schema %s",
cells.size(),
fields.size());
// Loop through all the fields in the row, normalizing their types with the TableFieldSchema
// and storing the normalized values by field name in the Map<String, Object> that
// underlies the TableRow.
Iterator<? extends Map<String, Object>> cellIt = cells.iterator();
Iterator<TableFieldSchema> fieldIt = fields.iterator();
while (cellIt.hasNext()) {
Map<String, Object> cell = cellIt.next();
TableFieldSchema fieldSchema = fieldIt.next();
// Convert the object in this cell to the Java type corresponding to its type in the schema.
Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v"));
String fieldName = fieldSchema.getName();
checkArgument(
!RESERVED_FIELD_NAMES.contains(fieldName),
"BigQueryIO does not support records with columns named %s",
fieldName);
if (convertedValue == null) {
// BigQuery does not include null values when the export operation (to JSON) is used.
// To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner,
// intentionally omits columns with null values.
continue;
}
row.set(fieldName, convertedValue);
}
return row;
}
private QueryResponse getTypedTableRows(QueryResponse response) {
List<TableRow> rows = response.getRows();
TableSchema schema = response.getSchema();
response.setRows(
rows.stream()
.map(r -> getTypedTableRow(schema.getFields(), r))
.collect(Collectors.toList()));
return response;
}
/** Performs a query without flattening results. */
@Nonnull
public List<TableRow> queryUnflattened(String query, String projectId, boolean typed)
throws IOException, InterruptedException {
Random rnd = new Random(System.currentTimeMillis());
String temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000);
String temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000);
TableReference tempTableReference =
new TableReference()
.setProjectId(projectId)
.setDatasetId(temporaryDatasetId)
.setTableId(temporaryTableId);
createNewDataset(projectId, temporaryDatasetId);
createNewTable(
projectId, temporaryDatasetId, new Table().setTableReference(tempTableReference));
JobConfigurationQuery jcQuery =
new JobConfigurationQuery()
.setFlattenResults(false)
.setAllowLargeResults(true)
.setDestinationTable(tempTableReference)
.setQuery(query);
JobConfiguration jc = new JobConfiguration().setQuery(jcQuery);
Job job = new Job().setConfiguration(jc);
Job insertedJob = bqClient.jobs().insert(projectId, job).execute();
GetQueryResultsResponse qResponse;
do {
qResponse =
bqClient
.jobs()
.getQueryResults(projectId, insertedJob.getJobReference().getJobId())
.execute();
} while (!qResponse.getJobComplete());
final TableSchema schema = qResponse.getSchema();
final List<TableRow> rows = qResponse.getRows();
deleteDataset(projectId, temporaryDatasetId);
return !typed
? rows
: rows.stream()
.map(r -> getTypedTableRow(schema.getFields(), r))
.collect(Collectors.toList());
}
@Nonnull
public QueryResponse queryWithRetries(String query, String projectId, boolean typed)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
QueryRequest bqQueryRequest = new QueryRequest().setQuery(query).setTimeoutMs(QUERY_TIMEOUT_MS);
do {
if (lastException != null) {
LOG.warn("Retrying query ({}) after exception", bqQueryRequest.getQuery(), lastException);
}
try {
QueryResponse response = bqClient.jobs().query(projectId, bqQueryRequest).execute();
if (response != null) {
return typed ? getTypedTableRows(response) : response;
} else {
lastException =
new IOException("Expected valid response from query job, but received null.");
}
} catch (IOException e) {
// ignore and retry
lastException = e;
}
} while (BackOffUtils.next(sleeper, backoff));
throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times using query (%s)",
MAX_QUERY_RETRIES, bqQueryRequest.getQuery()),
lastException);
}
/** Creates a new dataset. */
public void createNewDataset(String projectId, String datasetId)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
do {
if (lastException != null) {
LOG.warn("Retrying insert dataset ({}) after exception", datasetId, lastException);
}
try {
Dataset response =
bqClient
.datasets()
.insert(
projectId,
new Dataset()
.setDatasetReference(new DatasetReference().setDatasetId(datasetId)))
.execute();
if (response != null) {
LOG.info("Successfully created new dataset : " + response.getId());
return;
} else {
lastException =
new IOException(
"Expected valid response from insert dataset job, but received null.");
}
} catch (IOException e) {
// ignore and retry
lastException = e;
}
} while (BackOffUtils.next(sleeper, backoff));
throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times for dataset (%s)",
MAX_QUERY_RETRIES, datasetId),
lastException);
}
public void deleteTable(String projectId, String datasetId, String tableName) {
try {
bqClient.tables().delete(projectId, datasetId, tableName).execute();
LOG.info("Successfully deleted table: " + tableName);
} catch (Exception e) {
LOG.debug("Exception caught when deleting table: " + e.getMessage());
}
}
public void deleteDataset(String projectId, String datasetId) {
try {
TableList tables = bqClient.tables().list(projectId, datasetId).execute();
for (Tables table : tables.getTables()) {
this.deleteTable(projectId, datasetId, table.getTableReference().getTableId());
}
} catch (Exception e) {
LOG.debug("Exceptions caught when listing all tables: " + e.getMessage());
}
try {
bqClient.datasets().delete(projectId, datasetId).execute();
LOG.info("Successfully deleted dataset: " + datasetId);
} catch (Exception e) {
LOG.debug("Exceptions caught when deleting dataset: " + e.getMessage());
}
}
public void createNewTable(String projectId, String datasetId, Table newTable)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
do {
if (lastException != null) {
LOG.warn("Retrying create table ({}) after exception", newTable.getId(), lastException);
}
try {
Table response = this.bqClient.tables().insert(projectId, datasetId, newTable).execute();
if (response != null) {
LOG.info("Successfully created new table: " + response.getId());
return;
} else {
lastException =
new IOException("Expected valid response from create table job, but received null.");
}
} catch (IOException e) {
// ignore and retry
lastException = e;
}
} while (BackOffUtils.next(sleeper, backoff));
throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times for table (%s)",
MAX_QUERY_RETRIES, newTable.getId()),
lastException);
}
/** Inserts rows to a table using a BigQuery streaming write. */
public void insertDataToTable(
String projectId, String datasetId, String tableName, List<Map<String, Object>> rows)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
do {
if (lastException != null) {
LOG.warn("Retrying insert table ({}) after exception", tableName, lastException);
}
try {
List<Rows> dataRows =
rows.stream().map(row -> new Rows().setJson(row)).collect(Collectors.toList());
TableDataInsertAllResponse response =
this.bqClient
.tabledata()
.insertAll(
projectId,
datasetId,
tableName,
new TableDataInsertAllRequest().setRows(dataRows))
.execute();
if (response != null
&& (response.getInsertErrors() == null || response.getInsertErrors().isEmpty())) {
LOG.info("Successfully inserted data into table : " + tableName);
return;
} else {
if (response == null || response.getInsertErrors() == null) {
lastException =
new IOException("Expected valid response from insert data job, but received null.");
} else {
lastException =
new IOException(
String.format(
"Got insertion error (%s)", response.getInsertErrors().toString()));
}
}
} catch (IOException e) {
// ignore and retry
lastException = e;
}
} while (BackOffUtils.next(sleeper, backoff));
throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times for table (%s)",
MAX_QUERY_RETRIES, tableName),
lastException);
}
public Table getTableResource(String projectId, String datasetId, String tableId)
throws IOException, InterruptedException {
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = BackOffAdapter.toGcpBackOff(BACKOFF_FACTORY.backoff());
IOException lastException = null;
do {
if (lastException != null) {
LOG.warn("Retrying tables.get ({}) after exception", tableId, lastException);
}
try {
Table response = this.bqClient.tables().get(projectId, datasetId, tableId).execute();
if (response != null) {
return response;
} else {
lastException =
new IOException("Expected valid response from tables.get, but received null.");
}
} catch (IOException e) {
// ignore and retry
lastException = e;
}
} while (BackOffUtils.next(sleeper, backoff));
throw new RuntimeException(
String.format(
"Unable to get BigQuery response after retrying %d times for tables.get (%s)",
MAX_QUERY_RETRIES, tableId),
lastException);
}
}