| /* |
| * Copyright (C) 2015 Google Inc. |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); you may not |
| * use this file except in compliance with the License. You may obtain a copy of |
| * the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| * License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package com.google.cloud.dataflow.sdk.util; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static com.google.common.base.Preconditions.checkState; |
| |
| import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; |
| import com.google.api.client.util.BackOff; |
| import com.google.api.client.util.BackOffUtils; |
| import com.google.api.client.util.ClassInfo; |
| import com.google.api.client.util.Data; |
| import com.google.api.client.util.Sleeper; |
| import com.google.api.services.bigquery.Bigquery; |
| import com.google.api.services.bigquery.model.Dataset; |
| import com.google.api.services.bigquery.model.DatasetReference; |
| import com.google.api.services.bigquery.model.ErrorProto; |
| import com.google.api.services.bigquery.model.Job; |
| import com.google.api.services.bigquery.model.JobConfiguration; |
| import com.google.api.services.bigquery.model.JobConfigurationQuery; |
| import com.google.api.services.bigquery.model.JobReference; |
| import com.google.api.services.bigquery.model.JobStatistics; |
| import com.google.api.services.bigquery.model.JobStatus; |
| import com.google.api.services.bigquery.model.Table; |
| import com.google.api.services.bigquery.model.TableCell; |
| import com.google.api.services.bigquery.model.TableDataList; |
| import com.google.api.services.bigquery.model.TableFieldSchema; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.util.concurrent.Uninterruptibles; |
| |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Objects; |
| import java.util.Random; |
| import java.util.concurrent.TimeUnit; |
| |
| import javax.annotation.Nullable; |
| |
| /** |
| * Iterates over all rows in a table. |
| */ |
| public class BigQueryTableRowIterator implements AutoCloseable { |
| private static final Logger LOG = LoggerFactory.getLogger(BigQueryTableRowIterator.class); |
| |
| @Nullable private TableReference ref; |
| @Nullable private final String projectId; |
| @Nullable private TableSchema schema; |
| private final Bigquery client; |
| private String pageToken; |
| private Iterator<TableRow> iteratorOverCurrentBatch; |
| private TableRow current; |
| // Set true when the final page is seen from the service. |
| private boolean lastPage = false; |
| |
| // The maximum number of times a BigQuery request will be retried |
| private static final int MAX_RETRIES = 3; |
| // Initial wait time for the backoff implementation |
| private static final Duration INITIAL_BACKOFF_TIME = Duration.standardSeconds(1); |
| |
| // After sending a query to BQ service we will be polling the BQ service to check the status with |
| // following interval to check the status of query execution job |
| private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1); |
| |
| private final String query; |
| // Whether to flatten query results. |
| private final boolean flattenResults; |
| // Whether to use the BigQuery legacy SQL dialect.. |
| private final boolean useLegacySql; |
| // Temporary dataset used to store query results. |
| private String temporaryDatasetId = null; |
| // Temporary table used to store query results. |
| private String temporaryTableId = null; |
| |
| private BigQueryTableRowIterator( |
| @Nullable TableReference ref, @Nullable String query, @Nullable String projectId, |
| Bigquery client, boolean flattenResults, boolean useLegacySql) { |
| this.ref = ref; |
| this.query = query; |
| this.projectId = projectId; |
| this.client = checkNotNull(client, "client"); |
| this.flattenResults = flattenResults; |
| this.useLegacySql = useLegacySql; |
| } |
| |
| /** |
| * Constructs a {@code BigQueryTableRowIterator} that reads from the specified table. |
| */ |
| public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) { |
| checkNotNull(ref, "ref"); |
| checkNotNull(client, "client"); |
| return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true, true); |
| } |
| |
| /** |
| * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the |
| * specified query in the specified project with useLegacySql set to True. |
| * |
| * @deprecated use {@link #fromQuery(String, String, Bigquery, Boolean, Boolean)}. |
| */ |
| @Deprecated |
| public static BigQueryTableRowIterator fromQuery( |
| String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) { |
| return fromQuery(query, projectId, client, flattenResults, null /* useLegacySql */); |
| } |
| |
| /** |
| * Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the |
| * specified query in the specified project. |
| */ |
| public static BigQueryTableRowIterator fromQuery( |
| String query, String projectId, Bigquery client, @Nullable Boolean flattenResults, |
| @Nullable Boolean useLegacySql) { |
| checkNotNull(query, "query"); |
| checkNotNull(projectId, "projectId"); |
| checkNotNull(client, "client"); |
| return new BigQueryTableRowIterator(null, query, projectId, client, |
| MoreObjects.firstNonNull(flattenResults, Boolean.TRUE), |
| MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE)); |
| } |
| |
| /** |
| * Opens the table for read. |
| * @throws IOException on failure |
| */ |
| public void open() throws IOException, InterruptedException { |
| if (query != null) { |
| ref = executeQueryAndWaitForCompletion(); |
| } |
| // Get table schema. |
| schema = getTable(ref).getSchema(); |
| } |
| |
| public boolean advance() throws IOException, InterruptedException { |
| while (true) { |
| if (iteratorOverCurrentBatch != null && iteratorOverCurrentBatch.hasNext()) { |
| // Embed schema information into the raw row, so that values have an |
| // associated key. This matches how rows are read when using the |
| // DataflowPipelineRunner. |
| current = getTypedTableRow(schema.getFields(), iteratorOverCurrentBatch.next()); |
| return true; |
| } |
| if (lastPage) { |
| return false; |
| } |
| |
| Bigquery.Tabledata.List list = |
| client.tabledata().list(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); |
| if (pageToken != null) { |
| list.setPageToken(pageToken); |
| } |
| |
| TableDataList result = |
| executeWithBackOff( |
| list, |
| String.format( |
| "Error reading from BigQuery table %s of dataset %s.", |
| ref.getTableId(), |
| ref.getDatasetId())); |
| |
| pageToken = result.getPageToken(); |
| iteratorOverCurrentBatch = |
| result.getRows() != null |
| ? result.getRows().iterator() |
| : Collections.<TableRow>emptyIterator(); |
| |
| // The server may return a page token indefinitely on a zero-length table. |
| if (pageToken == null || result.getTotalRows() != null && result.getTotalRows() == 0) { |
| lastPage = true; |
| } |
| } |
| } |
| |
| public TableRow getCurrent() { |
| if (current == null) { |
| throw new NoSuchElementException(); |
| } |
| return current; |
| } |
| |
| /** |
| * Adjusts a field returned from the BigQuery API to match what we will receive when running |
| * BigQuery's export-to-GCS and parallel read, which is the efficient parallel implementation |
| * used for batch jobs executed on the Cloud Dataflow service. |
| * |
| * <p>The following is the relationship between BigQuery schema and Java types: |
| * |
| * <ul> |
| * <li>Nulls are {@code null}. |
| * <li>Repeated fields are {@code List} of objects. |
| * <li>Record columns are {@link TableRow} objects. |
| * <li>{@code BOOLEAN} columns are JSON booleans, hence Java {@code Boolean} objects. |
| * <li>{@code FLOAT} columns are JSON floats, hence Java {@code Double} objects. |
| * <li>{@code TIMESTAMP} columns are {@code String} objects that are of the format |
| * {@code yyyy-MM-dd HH:mm:ss[.SSSSSS] UTC}, where the {@code .SSSSSS} has no trailing |
| * zeros and can be 1 to 6 digits long. |
| * <li>Every other atomic type is a {@code String}. |
| * </ul> |
| * |
| * <p>Note that integers are encoded as strings to match BigQuery's exported JSON format. |
| * |
| * <p>Finally, values are stored in the {@link TableRow} as {"field name": value} pairs |
| * and are not accessible through the {@link TableRow#getF} function. |
| */ |
| @Nullable private Object getTypedCellValue(TableFieldSchema fieldSchema, Object v) { |
| if (Data.isNull(v)) { |
| return null; |
| } |
| |
| if (Objects.equals(fieldSchema.getMode(), "REPEATED")) { |
| TableFieldSchema elementSchema = fieldSchema.clone().setMode("REQUIRED"); |
| @SuppressWarnings("unchecked") |
| List<Map<String, Object>> rawCells = (List<Map<String, Object>>) v; |
| ImmutableList.Builder<Object> values = ImmutableList.builder(); |
| for (Map<String, Object> element : rawCells) { |
| values.add(getTypedCellValue(elementSchema, element.get("v"))); |
| } |
| return values.build(); |
| } |
| |
| if (fieldSchema.getType().equals("RECORD")) { |
| @SuppressWarnings("unchecked") |
| Map<String, Object> typedV = (Map<String, Object>) v; |
| return getTypedTableRow(fieldSchema.getFields(), typedV); |
| } |
| |
| if (fieldSchema.getType().equals("FLOAT")) { |
| return Double.parseDouble((String) v); |
| } |
| |
| if (fieldSchema.getType().equals("BOOLEAN")) { |
| return Boolean.parseBoolean((String) v); |
| } |
| |
| if (fieldSchema.getType().equals("TIMESTAMP")) { |
| return AvroUtils.formatTimestamp((String) v); |
| } |
| |
| // Returns the original value for: |
| // 1. String, 2. base64 encoded BYTES, 3. DATE, DATETIME, TIME strings. |
| return v; |
| } |
| |
| /** |
| * A list of the field names that cannot be used in BigQuery tables processed by Dataflow, |
| * because they are reserved keywords in {@link TableRow}. |
| */ |
| // TODO: This limitation is unfortunate. We need to give users a way to use BigQueryIO that does |
| // not indirect through our broken use of {@link TableRow}. |
| // See discussion: https://github.com/GoogleCloudPlatform/DataflowJavaSDK/pull/41 |
| private static final Collection<String> RESERVED_FIELD_NAMES = |
| ClassInfo.of(TableRow.class).getNames(); |
| |
| /** |
| * Converts a row returned from the BigQuery JSON API as a {@code Map<String, Object>} into a |
| * Java {@link TableRow} with nested {@link TableCell TableCells}. The {@code Object} values in |
| * the cells are converted to Java types according to the provided field schemas. |
| * |
| * <p>See {@link #getTypedCellValue(TableFieldSchema, Object)} for details on how BigQuery |
| * types are mapped to Java types. |
| */ |
| private TableRow getTypedTableRow(List<TableFieldSchema> fields, Map<String, Object> rawRow) { |
| // If rawRow is a TableRow, use it. If not, create a new one. |
| TableRow row; |
| List<? extends Map<String, Object>> cells; |
| if (rawRow instanceof TableRow) { |
| // Since rawRow is a TableRow it already has TableCell objects in setF. We do not need to do |
| // any type conversion, but extract the cells for cell-wise processing below. |
| row = (TableRow) rawRow; |
| cells = row.getF(); |
| // Clear the cells from the row, so that row.getF() will return null. This matches the |
| // behavior of rows produced by the BigQuery export API used on the service. |
| row.setF(null); |
| } else { |
| row = new TableRow(); |
| |
| // Since rawRow is a Map<String, Object> we use Map.get("f") instead of TableRow.getF() to |
| // get its cells. Similarly, when rawCell is a Map<String, Object> instead of a TableCell, |
| // we will use Map.get("v") instead of TableCell.getV() get its value. |
| @SuppressWarnings("unchecked") |
| List<? extends Map<String, Object>> rawCells = |
| (List<? extends Map<String, Object>>) rawRow.get("f"); |
| cells = rawCells; |
| } |
| |
| checkState(cells.size() == fields.size(), |
| "Expected that the row has the same number of cells %s as fields in the schema %s", |
| cells.size(), fields.size()); |
| |
| // Loop through all the fields in the row, normalizing their types with the TableFieldSchema |
| // and storing the normalized values by field name in the Map<String, Object> that |
| // underlies the TableRow. |
| Iterator<? extends Map<String, Object>> cellIt = cells.iterator(); |
| Iterator<TableFieldSchema> fieldIt = fields.iterator(); |
| while (cellIt.hasNext()) { |
| Map<String, Object> cell = cellIt.next(); |
| TableFieldSchema fieldSchema = fieldIt.next(); |
| |
| // Convert the object in this cell to the Java type corresponding to its type in the schema. |
| Object convertedValue = getTypedCellValue(fieldSchema, cell.get("v")); |
| |
| String fieldName = fieldSchema.getName(); |
| checkArgument(!RESERVED_FIELD_NAMES.contains(fieldName), |
| "BigQueryIO does not support records with columns named %s", fieldName); |
| |
| if (convertedValue == null) { |
| // BigQuery does not include null values when the export operation (to JSON) is used. |
| // To match that behavior, BigQueryTableRowiterator, and the DirectPipelineRunner, |
| // intentionally omits columns with null values. |
| continue; |
| } |
| |
| row.set(fieldName, convertedValue); |
| } |
| return row; |
| } |
| |
| // Get the BiqQuery table. |
| private Table getTable(TableReference ref) throws IOException, InterruptedException { |
| Bigquery.Tables.Get get = |
| client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); |
| |
| return executeWithBackOff( |
| get, |
| String.format( |
| "Error opening BigQuery table %s of dataset %s.", |
| ref.getTableId(), |
| ref.getDatasetId())); |
| } |
| |
| // Create a new BigQuery dataset |
| private void createDataset(String datasetId, @Nullable String location) |
| throws IOException, InterruptedException { |
| Dataset dataset = new Dataset(); |
| DatasetReference reference = new DatasetReference(); |
| reference.setProjectId(projectId); |
| reference.setDatasetId(datasetId); |
| dataset.setDatasetReference(reference); |
| if (location != null) { |
| dataset.setLocation(location); |
| } |
| |
| executeWithBackOff( |
| client.datasets().insert(projectId, dataset), |
| String.format( |
| "Error when trying to create the temporary dataset %s in project %s", |
| datasetId, projectId)); |
| } |
| |
| // Delete the given table that is available in the given dataset. |
| private void deleteTable(String datasetId, String tableId) |
| throws IOException, InterruptedException { |
| executeWithBackOff( |
| client.tables().delete(projectId, datasetId, tableId), |
| String.format( |
| "Error when trying to delete the temporary table %s in dataset %s of project %s. " |
| + "Manual deletion may be required.", |
| tableId, datasetId, projectId)); |
| } |
| |
| // Delete the given dataset. This will fail if the given dataset has any tables. |
| private void deleteDataset(String datasetId) throws IOException, InterruptedException { |
| executeWithBackOff( |
| client.datasets().delete(projectId, datasetId), |
| String.format( |
| "Error when trying to delete the temporary dataset %s in project %s. " |
| + "Manual deletion may be required.", |
| datasetId, projectId)); |
| } |
| |
| /** |
| * Executes the specified query and returns a reference to the temporary BigQuery table created |
| * to hold the results. |
| * |
| * @throws IOException if the query fails. |
| */ |
| private TableReference executeQueryAndWaitForCompletion() |
| throws IOException, InterruptedException { |
| // Dry run query to get source table location |
| Job dryRunJob = new Job() |
| .setConfiguration(new JobConfiguration() |
| .setQuery(new JobConfigurationQuery() |
| .setQuery(query)) |
| .setDryRun(true)); |
| JobStatistics jobStats = executeWithBackOff( |
| client.jobs().insert(projectId, dryRunJob), |
| String.format("Error when trying to dry run query %s.", query)).getStatistics(); |
| |
| // Let BigQuery to pick default location if the query does not read any tables. |
| String location = null; |
| @Nullable List<TableReference> tables = jobStats.getQuery().getReferencedTables(); |
| if (tables != null && !tables.isEmpty()) { |
| Table table = getTable(tables.get(0)); |
| location = table.getLocation(); |
| } |
| |
| // Create a temporary dataset to store results. |
| // Starting dataset name with an "_" so that it is hidden. |
| Random rnd = new Random(System.currentTimeMillis()); |
| temporaryDatasetId = "_dataflow_temporary_dataset_" + rnd.nextInt(1000000); |
| temporaryTableId = "dataflow_temporary_table_" + rnd.nextInt(1000000); |
| |
| createDataset(temporaryDatasetId, location); |
| Job job = new Job(); |
| JobConfiguration config = new JobConfiguration(); |
| JobConfigurationQuery queryConfig = new JobConfigurationQuery(); |
| config.setQuery(queryConfig); |
| job.setConfiguration(config); |
| queryConfig.setQuery(query); |
| queryConfig.setAllowLargeResults(true); |
| queryConfig.setFlattenResults(flattenResults); |
| queryConfig.setUseLegacySql(useLegacySql); |
| |
| |
| TableReference destinationTable = new TableReference(); |
| destinationTable.setProjectId(projectId); |
| destinationTable.setDatasetId(temporaryDatasetId); |
| destinationTable.setTableId(temporaryTableId); |
| queryConfig.setDestinationTable(destinationTable); |
| |
| Job queryJob = executeWithBackOff( |
| client.jobs().insert(projectId, job), |
| String.format("Error when trying to execute the job for query %s.", query)); |
| JobReference jobId = queryJob.getJobReference(); |
| |
| while (true) { |
| Job pollJob = executeWithBackOff( |
| client.jobs().get(projectId, jobId.getJobId()), |
| String.format("Error when trying to get status of the job for query %s.", query)); |
| JobStatus status = pollJob.getStatus(); |
| if (status.getState().equals("DONE")) { |
| // Job is DONE, but did not necessarily succeed. |
| ErrorProto error = status.getErrorResult(); |
| if (error == null) { |
| return pollJob.getConfiguration().getQuery().getDestinationTable(); |
| } else { |
| // There will be no temporary table to delete, so null out the reference. |
| temporaryTableId = null; |
| throw new IOException("Executing query " + query + " failed: " + error.getMessage()); |
| } |
| } |
| Uninterruptibles.sleepUninterruptibly( |
| QUERY_COMPLETION_POLL_TIME.getMillis(), TimeUnit.MILLISECONDS); |
| } |
| } |
| |
| /** |
| * Execute a BQ request with exponential backoff and return the result. |
| * |
| * @deprecated use {@link #executeWithBackOff(AbstractGoogleClientRequest, String)}. |
| */ |
| @Deprecated |
| public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error, |
| Object... errorArgs) throws IOException, InterruptedException { |
| return executeWithBackOff(client, String.format(error, errorArgs)); |
| } |
| |
| // Execute a BQ request with exponential backoff and return the result. |
| // client - BQ request to be executed |
| // error - Formatted message to log if when a request fails. Takes exception message as a |
| // formatter parameter. |
| public static <T> T executeWithBackOff(AbstractGoogleClientRequest<T> client, String error) |
| throws IOException, InterruptedException { |
| Sleeper sleeper = Sleeper.DEFAULT; |
| BackOff backOff = |
| FluentBackoff.DEFAULT |
| .withMaxRetries(MAX_RETRIES).withInitialBackoff(INITIAL_BACKOFF_TIME).backoff(); |
| |
| T result = null; |
| while (true) { |
| try { |
| result = client.execute(); |
| break; |
| } catch (IOException e) { |
| LOG.error("{}", error, e); |
| if (!BackOffUtils.next(sleeper, backOff)) { |
| String errorMessage = String.format( |
| "%s Failing to execute job after %d attempts.", error, MAX_RETRIES + 1); |
| LOG.error("{}", errorMessage, e); |
| throw new IOException(errorMessage, e); |
| } |
| } |
| } |
| |
| return result; |
| } |
| |
| @Override |
| public void close() { |
| // Prevent any further requests. |
| lastPage = true; |
| |
| try { |
| // Deleting temporary table and dataset that gets generated when executing a query. |
| if (temporaryDatasetId != null) { |
| if (temporaryTableId != null) { |
| deleteTable(temporaryDatasetId, temporaryTableId); |
| } |
| deleteDataset(temporaryDatasetId); |
| } |
| } catch (IOException | InterruptedException e) { |
| if (e instanceof InterruptedException) { |
| Thread.currentThread().interrupt(); |
| } |
| throw new RuntimeException(e); |
| } |
| |
| } |
| } |