blob: 61cbfed0b2c92e157a1fdc238c9aaa721e95d0d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.services.bigquery.model.EncryptionConfiguration;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationQuery;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Helper object for executing query jobs in BigQuery.
*
* <p>This object is not serializable, and its state can be safely discarded across serialization
* boundaries for any associated source objects.
*/
class BigQueryQueryHelper {
private static final Integer JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
private static final Logger LOG = LoggerFactory.getLogger(BigQueryQueryHelper.class);
public static JobStatistics dryRunQueryIfNeeded(
BigQueryServices bqServices,
BigQueryOptions options,
AtomicReference<JobStatistics> dryRunJobStats,
String query,
Boolean flattenResults,
Boolean useLegacySql,
@Nullable String location)
throws InterruptedException, IOException {
if (dryRunJobStats.get() == null) {
JobStatistics jobStatistics =
bqServices
.getJobService(options)
.dryRunQuery(
options.getProject(),
createBasicQueryConfig(query, flattenResults, useLegacySql),
location);
dryRunJobStats.compareAndSet(null, jobStatistics);
}
return dryRunJobStats.get();
}
public static TableReference executeQuery(
BigQueryServices bqServices,
BigQueryOptions options,
AtomicReference<JobStatistics> dryRunJobStats,
String stepUuid,
String query,
Boolean flattenResults,
Boolean useLegacySql,
QueryPriority priority,
@Nullable String location,
@Nullable String queryTempDatasetId,
@Nullable String kmsKey)
throws InterruptedException, IOException {
// Step 1: Find the effective location of the query.
String effectiveLocation = location;
DatasetService tableService = bqServices.getDatasetService(options);
if (effectiveLocation == null) {
List<TableReference> referencedTables =
dryRunQueryIfNeeded(
bqServices,
options,
dryRunJobStats,
query,
flattenResults,
useLegacySql,
location)
.getQuery()
.getReferencedTables();
if (referencedTables != null && !referencedTables.isEmpty()) {
TableReference referencedTable = referencedTables.get(0);
effectiveLocation = tableService.getTable(referencedTable).getLocation();
}
}
// Step 2: Create a temporary dataset in the query location only if the user has not specified a
// temp dataset.
String jobIdToken = createJobIdToken(options.getJobName(), stepUuid);
Optional<String> queryTempDatasetOpt = Optional.ofNullable(queryTempDatasetId);
TableReference queryResultTable =
createTempTableReference(options.getProject(), jobIdToken, queryTempDatasetOpt);
boolean beamToCreateTempDataset = !queryTempDatasetOpt.isPresent();
// Create dataset only if it has not been set by the user
if (beamToCreateTempDataset) {
LOG.info("Creating temporary dataset {} for query results", queryResultTable.getDatasetId());
tableService.createDataset(
queryResultTable.getProjectId(),
queryResultTable.getDatasetId(),
effectiveLocation,
"Temporary tables for query results of job " + options.getJobName(),
TimeUnit.DAYS.toMillis(1));
} else { // If the user specified a temp dataset, check that the destination table does not
// exist
Table destTable = tableService.getTable(queryResultTable);
checkArgument(
destTable == null,
"Refusing to write on existing table {} in the specified temp dataset {}",
queryResultTable.getTableId(),
queryResultTable.getDatasetId());
}
// Step 3: Execute the query. Generate a transient (random) query job ID, because this code may
// be retried after the temporary dataset and table have been deleted by a previous attempt --
// in that case, we want to regenerate the temporary dataset and table, and we'll need a fresh
// query ID to do that.
String queryJobId = jobIdToken + "-query-" + BigQueryHelpers.randomUUIDString();
LOG.info(
"Exporting query results into temporary table {} using job {}",
queryResultTable,
queryJobId);
JobReference jobReference =
new JobReference()
.setProjectId(options.getProject())
.setLocation(effectiveLocation)
.setJobId(queryJobId);
JobConfigurationQuery queryConfiguration =
createBasicQueryConfig(query, flattenResults, useLegacySql)
.setAllowLargeResults(true)
.setDestinationTable(queryResultTable)
.setCreateDisposition("CREATE_IF_NEEDED")
.setWriteDisposition("WRITE_TRUNCATE")
.setPriority(priority.name());
if (kmsKey != null) {
queryConfiguration.setDestinationEncryptionConfiguration(
new EncryptionConfiguration().setKmsKeyName(kmsKey));
}
JobService jobService = bqServices.getJobService(options);
jobService.startQueryJob(jobReference, queryConfiguration);
Job job = jobService.pollJob(jobReference, JOB_POLL_MAX_RETRIES);
if (BigQueryHelpers.parseStatus(job) != Status.SUCCEEDED) {
throw new IOException(
String.format(
"Query job %s failed, status: %s",
queryJobId, BigQueryHelpers.statusToPrettyString(job.getStatus())));
}
LOG.info("Query job {} completed", queryJobId);
return queryResultTable;
}
private static JobConfigurationQuery createBasicQueryConfig(
String query, Boolean flattenResults, Boolean useLegacySql) {
return new JobConfigurationQuery()
.setQuery(query)
.setFlattenResults(flattenResults)
.setUseLegacySql(useLegacySql);
}
}