blob: 82bb5ad68b56f966f3bdb542b3c3a1698ded26d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Dataset;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Matcher;
import javax.annotation.Nullable;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.DatasetService;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** A set of helper functions and classes used by {@link BigQueryIO}. */
public class BigQueryHelpers {
private static final String RESOURCE_NOT_FOUND_ERROR =
"BigQuery %1$s not found for table \"%2$s\" . Please create the %1$s before pipeline"
+ " execution. If the %1$s is created by an earlier stage of the pipeline, this"
+ " validation can be disabled using #withoutValidation.";
private static final String UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR =
"Unable to confirm BigQuery %1$s presence for table \"%2$s\". If the %1$s is created by"
+ " an earlier stage of the pipeline, this validation can be disabled using"
+ " #withoutValidation.";
private static final Logger LOG = LoggerFactory.getLogger(BigQueryHelpers.class);
// Given a potential failure and a current job-id, return the next job-id to be used on retry.
// Algorithm is as follows (given input of job_id_prefix-N)
// If BigQuery has no status for job_id_prefix-n, we should retry with the same id.
// If job-id-prefix-n is in the PENDING or successful states, no retry is needed.
// Otherwise (job-id-prefix-n completed with errors), try again with job-id-prefix-(n+1)
//
// We continue to loop through these job ids until we find one that has either succeed, or that
// has not been issued yet.
static class RetryJobIdResult {
public final RetryJobId jobId;
public final boolean shouldRetry;
public RetryJobIdResult(RetryJobId jobId, boolean shouldRetry) {
this.jobId = jobId;
this.shouldRetry = shouldRetry;
}
}
// A class that waits for pending jobs, retrying them according to policy if they fail.
static class PendingJobManager {
private static class JobInfo {
private final PendingJob pendingJob;
@Nullable private final SerializableFunction<PendingJob, Exception> onSuccess;
public JobInfo(PendingJob pendingJob, SerializableFunction<PendingJob, Exception> onSuccess) {
this.pendingJob = pendingJob;
this.onSuccess = onSuccess;
}
}
private List<JobInfo> pendingJobs = Lists.newArrayList();
private final BackOff backOff;
PendingJobManager() {
this(
BackOffAdapter.toGcpBackOff(
FluentBackoff.DEFAULT
.withMaxRetries(Integer.MAX_VALUE)
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxBackoff(Duration.standardMinutes(1))
.backoff()));
}
PendingJobManager(BackOff backOff) {
this.backOff = backOff;
}
// Add a pending job and a function to call when the job has completed successfully.
PendingJobManager addPendingJob(
PendingJob pendingJob, @Nullable SerializableFunction<PendingJob, Exception> onSuccess) {
this.pendingJobs.add(new JobInfo(pendingJob, onSuccess));
return this;
}
void waitForDone() throws Exception {
LOG.info("Waiting for jobs to complete.");
Sleeper sleeper = Sleeper.DEFAULT;
while (!pendingJobs.isEmpty()) {
List<JobInfo> retryJobs = Lists.newArrayList();
for (JobInfo jobInfo : pendingJobs) {
if (jobInfo.pendingJob.pollJob()) {
// Job has completed successfully.
LOG.info("Job {} completed successfully.", jobInfo.pendingJob.currentJobId);
Exception e = jobInfo.onSuccess.apply(jobInfo.pendingJob);
if (e != null) {
throw e;
}
} else {
// Job failed, schedule it again.
LOG.info("Job {} failed. retrying.", jobInfo.pendingJob.currentJobId);
retryJobs.add(jobInfo);
}
}
pendingJobs = retryJobs;
if (!pendingJobs.isEmpty()) {
// Sleep before retrying.
nextBackOff(sleeper, backOff);
// Run the jobs to retry. If a job has hit the maximum number of retries then runJob
// will raise an exception.
for (JobInfo job : pendingJobs) {
job.pendingJob.runJob();
}
}
}
}
/** Identical to {@link BackOffUtils#next} but without checked IOException. */
private static boolean nextBackOff(Sleeper sleeper, BackOff backOff)
throws InterruptedException {
try {
return BackOffUtils.next(sleeper, backOff);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class PendingJob {
private final SerializableFunction<RetryJobId, Void> executeJob;
private final SerializableFunction<RetryJobId, Job> pollJob;
private final SerializableFunction<RetryJobId, Job> lookupJob;
private final int maxRetries;
private int currentAttempt;
RetryJobId currentJobId;
Job lastJobAttempted;
boolean started;
PendingJob(
SerializableFunction<RetryJobId, Void> executeJob,
SerializableFunction<RetryJobId, Job> pollJob,
SerializableFunction<RetryJobId, Job> lookupJob,
int maxRetries,
String jobIdPrefix) {
this.executeJob = executeJob;
this.pollJob = pollJob;
this.lookupJob = lookupJob;
this.maxRetries = maxRetries;
this.currentAttempt = 0;
currentJobId = new RetryJobId(jobIdPrefix, 0);
this.started = false;
}
// Run the job.
void runJob() throws IOException {
++currentAttempt;
if (!shouldRetry()) {
throw new RuntimeException(
String.format(
"Failed to create job with prefix %s, "
+ "reached max retries: %d, last failed job: %s.",
currentJobId.getJobIdPrefix(),
maxRetries,
BigQueryHelpers.jobToPrettyString(lastJobAttempted)));
}
try {
this.started = false;
executeJob.apply(currentJobId);
} catch (RuntimeException e) {
LOG.warn("Job {} failed with {}", currentJobId.getJobId(), e);
// It's possible that the job actually made it to BQ even though we got a failure here.
// For example, the response from BQ may have timed out returning. getRetryJobId will
// return the correct job id to use on retry, or a job id to continue polling (if it turns
// out that the job has not actually failed yet).
RetryJobIdResult result = getRetryJobId(currentJobId, lookupJob);
currentJobId = result.jobId;
if (result.shouldRetry) {
// Otherwise the jobs either never started or started and failed. Try the job again with
// the job id returned by getRetryJobId.
LOG.info("Will retry with job id {}", currentJobId.getJobId());
return;
}
}
LOG.info("job {} started", currentJobId.getJobId());
// The job has reached BigQuery and is in either the PENDING state or has completed
// successfully.
this.started = true;
}
// Poll the status of the job. Returns true if the job has completed successfully and false
// otherwise.
boolean pollJob() throws IOException {
if (started) {
Job job = pollJob.apply(currentJobId);
this.lastJobAttempted = job;
Status jobStatus = parseStatus(job);
switch (jobStatus) {
case SUCCEEDED:
LOG.info("Load job {} succeeded. Statistics: {}", currentJobId, job.getStatistics());
return true;
case UNKNOWN:
// This might happen if BigQuery's job listing is slow. Retry with the same
// job id.
LOG.info(
"Load job {} finished in unknown state: {}: {}",
currentJobId,
job.getStatus(),
shouldRetry() ? "will retry" : "will not retry");
return false;
case FAILED:
String oldJobId = currentJobId.getJobId();
currentJobId = BigQueryHelpers.getRetryJobId(currentJobId, lookupJob).jobId;
LOG.info(
"Load job {} failed, {}: {}. Next job id {}",
oldJobId,
shouldRetry() ? "will retry" : "will not retry",
job.getStatus(),
currentJobId);
return false;
default:
throw new IllegalStateException(
String.format(
"Unexpected status [%s] of load job: %s.",
job.getStatus(), BigQueryHelpers.jobToPrettyString(job)));
}
}
return false;
}
boolean shouldRetry() {
return currentAttempt < maxRetries + 1;
}
}
static class RetryJobId {
private final String jobIdPrefix;
private final int retryIndex;
RetryJobId(String jobIdPrefix, int retryIndex) {
this.jobIdPrefix = jobIdPrefix;
this.retryIndex = retryIndex;
}
String getJobIdPrefix() {
return jobIdPrefix;
}
int getRetryIndex() {
return retryIndex;
}
String getJobId() {
return jobIdPrefix + "-" + retryIndex;
}
@Override
public String toString() {
return getJobId();
}
}
static RetryJobIdResult getRetryJobId(
RetryJobId currentJobId, SerializableFunction<RetryJobId, Job> lookupJob) {
for (int retryIndex = currentJobId.getRetryIndex(); ; retryIndex++) {
RetryJobId jobId = new RetryJobId(currentJobId.getJobIdPrefix(), retryIndex);
try {
Job loadJob = lookupJob.apply(jobId);
if (loadJob == null) {
LOG.info("job id {} not found, so retrying with that id", jobId);
// This either means that the original job was never properly issued (on the first
// iteration of the loop) or that we've found a retry id that has not been used yet. Try
// again with this job id.
return new RetryJobIdResult(jobId, true);
}
JobStatus jobStatus = loadJob.getStatus();
if (jobStatus == null) {
LOG.info("job status for {} not found, so retrying with that job id", jobId);
return new RetryJobIdResult(jobId, true);
}
if ("PENDING".equals(jobStatus.getState()) || "RUNNING".equals(jobStatus.getState())) {
// The job id has been issued and is currently pending. This can happen after receiving
// an error from the load or copy job creation (e.g. that error might come because the
// job already exists). Return to the caller which job id is pending (it might not be the
// one passed in) so the caller can then wait for this job to finish.
LOG.info("job {} in pending or running state, so continuing with that job id", jobId);
return new RetryJobIdResult(jobId, false);
}
if (jobStatus.getErrorResult() == null
&& (jobStatus.getErrors() == null || jobStatus.getErrors().isEmpty())) {
// Import succeeded. No retry needed.
LOG.info("job {} succeeded, so not retrying ", jobId);
return new RetryJobIdResult(jobId, false);
}
// This job has failed, so we assume the data cannot enter BigQuery. We will check the next
// job in the sequence (with the same unique prefix) to see if is either pending/succeeded
// or can be used to generate a retry job.
LOG.info("job {} is failed. Checking the next job id", jobId);
} catch (RuntimeException e) {
LOG.info("caught exception while querying job {}", jobId);
return new RetryJobIdResult(jobId, true);
}
}
}
/** Status of a BigQuery job or request. */
enum Status {
SUCCEEDED,
FAILED,
UNKNOWN,
}
@VisibleForTesting
static TableReferenceProto.TableReference toTableRefProto(TableReference ref) {
TableReferenceProto.TableReference.Builder builder =
TableReferenceProto.TableReference.newBuilder();
if (ref.getProjectId() != null) {
builder.setProjectId(ref.getProjectId());
}
return builder.setDatasetId(ref.getDatasetId()).setTableId(ref.getTableId()).build();
}
@VisibleForTesting
static TableReference toTableRef(TableReferenceProto.TableReference ref) {
return new TableReference()
.setProjectId(ref.getProjectId())
.setDatasetId(ref.getDatasetId())
.setTableId(ref.getTableId());
}
/** Return a displayable string representation for a {@link TableReference}. */
@Nullable
static ValueProvider<String> displayTable(@Nullable ValueProvider<TableReference> table) {
if (table == null) {
return null;
}
return NestedValueProvider.of(table, new TableRefToTableSpec());
}
/** Returns a canonical string representation of the {@link TableReference}. */
public static String toTableSpec(TableReference ref) {
StringBuilder sb = new StringBuilder();
if (ref.getProjectId() != null) {
sb.append(ref.getProjectId());
sb.append(":");
}
sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
return sb.toString();
}
@Nullable
static ValueProvider<String> displayTableRefProto(
@Nullable ValueProvider<TableReferenceProto.TableReference> table) {
if (table == null) {
return null;
}
return NestedValueProvider.of(table, new TableRefProtoToTableSpec());
}
/** Returns a canonical string representation of a {@link TableReferenceProto.TableReference}. */
public static String toTableSpec(TableReferenceProto.TableReference ref) {
StringBuilder sb = new StringBuilder();
if (ref.getProjectId() != null) {
sb.append(ref.getProjectId());
sb.append(":");
}
sb.append(ref.getDatasetId()).append('.').append(ref.getTableId());
return sb.toString();
}
static <K, V> List<V> getOrCreateMapListValue(Map<K, List<V>> map, K key) {
return map.computeIfAbsent(key, k -> new ArrayList<>());
}
/**
* Parse a table specification in the form {@code "[project_id]:[dataset_id].[table_id]"} or
* {@code "[dataset_id].[table_id]"}.
*
* <p>If the project id is omitted, the default project id is used.
*/
public static TableReference parseTableSpec(String tableSpec) {
Matcher match = BigQueryIO.TABLE_SPEC.matcher(tableSpec);
if (!match.matches()) {
throw new IllegalArgumentException(
"Table reference is not in [project_id]:[dataset_id].[table_id] "
+ "format: "
+ tableSpec);
}
TableReference ref = new TableReference();
ref.setProjectId(match.group("PROJECT"));
return ref.setDatasetId(match.group("DATASET")).setTableId(match.group("TABLE"));
}
/** Strip off any partition decorator information from a tablespec. */
public static String stripPartitionDecorator(String tableSpec) {
int index = tableSpec.lastIndexOf('$');
return (index == -1) ? tableSpec : tableSpec.substring(0, index);
}
static String jobToPrettyString(@Nullable Job job) throws IOException {
if (job != null && job.getConfiguration().getLoad() != null) {
// Removing schema and sourceUris from error messages for load jobs since these fields can be
// quite long and error message might not be displayed properly in runner specific logs.
job = job.clone();
job.getConfiguration().getLoad().setSchema(null);
job.getConfiguration().getLoad().setSourceUris(null);
}
return job == null ? "null" : job.toPrettyString();
}
static String statusToPrettyString(@Nullable JobStatus status) throws IOException {
return status == null ? "Unknown status: null." : status.toPrettyString();
}
static Status parseStatus(@Nullable Job job) {
if (job == null) {
return Status.UNKNOWN;
}
JobStatus status = job.getStatus();
if (status.getErrorResult() != null) {
return Status.FAILED;
} else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
return Status.FAILED;
} else {
return Status.SUCCEEDED;
}
}
@VisibleForTesting
static String toJsonString(Object item) {
if (item == null) {
return null;
}
try {
return BigQueryIO.JSON_FACTORY.toString(item);
} catch (IOException e) {
throw new RuntimeException(
String.format("Cannot serialize %s to a JSON string.", item.getClass().getSimpleName()),
e);
}
}
@VisibleForTesting
static <T> T fromJsonString(String json, Class<T> clazz) {
if (json == null) {
return null;
}
try {
return BigQueryIO.JSON_FACTORY.fromString(json, clazz);
} catch (IOException e) {
throw new RuntimeException(
String.format("Cannot deserialize %s from a JSON string: %s.", clazz, json), e);
}
}
/**
* Returns a randomUUID string.
*
* <p>{@code '-'} is removed because BigQuery doesn't allow it in dataset id.
*/
static String randomUUIDString() {
return UUID.randomUUID().toString().replaceAll("-", "");
}
static void verifyTableNotExistOrEmpty(DatasetService datasetService, TableReference tableRef) {
try {
if (datasetService.getTable(tableRef) != null) {
checkState(
datasetService.isTableEmpty(tableRef),
"BigQuery table is not empty: %s.",
toTableSpec(tableRef));
}
} catch (IOException | InterruptedException e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(
"unable to confirm BigQuery table emptiness for table " + toTableSpec(tableRef), e);
}
}
static void verifyDatasetPresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getDataset(table.getProjectId(), table.getDatasetId());
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "dataset", toTableSpec(table)), e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(
UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "dataset", toTableSpec(table)),
e);
}
}
}
static String getDatasetLocation(
DatasetService datasetService, String projectId, String datasetId) {
Dataset dataset;
try {
dataset = datasetService.getDataset(projectId, datasetId);
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw new RuntimeException(
String.format(
"unable to obtain dataset for dataset %s in project %s", datasetId, projectId),
e);
}
return dataset.getLocation();
}
static void verifyTablePresence(DatasetService datasetService, TableReference table) {
try {
datasetService.getTable(table);
} catch (Exception e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if ((e instanceof IOException) && errorExtractor.itemNotFound((IOException) e)) {
throw new IllegalArgumentException(
String.format(RESOURCE_NOT_FOUND_ERROR, "table", toTableSpec(table)), e);
} else if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(
String.format(
UNABLE_TO_CONFIRM_PRESENCE_OF_RESOURCE_ERROR, "table", toTableSpec(table)),
e);
}
}
}
// Create a unique job id for a table load.
static String createJobId(
String prefix, TableDestination tableDestination, int partition, long index) {
// Job ID must be different for each partition of each table.
String destinationHash =
Hashing.murmur3_128().hashUnencodedChars(tableDestination.toString()).toString();
String jobId = String.format("%s_%s", prefix, destinationHash);
if (partition >= 0) {
jobId += String.format("_%05d", partition);
}
if (index >= 0) {
jobId += String.format("_%05d", index);
}
return jobId;
}
@VisibleForTesting
static class JsonSchemaToTableSchema implements SerializableFunction<String, TableSchema> {
@Override
public TableSchema apply(String from) {
return fromJsonString(from, TableSchema.class);
}
}
static class TableSchemaToJsonSchema implements SerializableFunction<TableSchema, String> {
@Override
public String apply(TableSchema from) {
return toJsonString(from);
}
}
static class JsonTableRefToTableRef implements SerializableFunction<String, TableReference> {
@Override
public TableReference apply(String from) {
return fromJsonString(from, TableReference.class);
}
}
static class JsonTableRefToTableSpec implements SerializableFunction<String, String> {
@Override
public String apply(String from) {
return toTableSpec(fromJsonString(from, TableReference.class));
}
}
static class TableRefToJson implements SerializableFunction<TableReference, String> {
@Override
public String apply(TableReference from) {
return toJsonString(from);
}
}
static class TableRefToTableRefProto
implements SerializableFunction<TableReference, TableReferenceProto.TableReference> {
@Override
public TableReferenceProto.TableReference apply(TableReference from) {
return toTableRefProto(from);
}
}
static class TableRefToTableSpec implements SerializableFunction<TableReference, String> {
@Override
public String apply(TableReference from) {
return toTableSpec(from);
}
}
static class TableRefProtoToTableSpec
implements SerializableFunction<TableReferenceProto.TableReference, String> {
@Override
public String apply(TableReferenceProto.TableReference from) {
return toTableSpec(from);
}
}
@VisibleForTesting
static class TableSpecToTableRef implements SerializableFunction<String, TableReference> {
@Override
public TableReference apply(String from) {
return parseTableSpec(from);
}
}
static class TimePartitioningToJson implements SerializableFunction<TimePartitioning, String> {
@Override
public String apply(TimePartitioning partitioning) {
return toJsonString(partitioning);
}
}
static String createJobIdToken(String jobName, String stepUuid) {
return String.format("beam_job_%s_%s", stepUuid, jobName.replaceAll("-", ""));
}
static String getExtractJobId(String jobIdToken) {
return String.format("%s-extract", jobIdToken);
}
static TableReference createTempTableReference(String projectId, String jobUuid) {
String queryTempDatasetId = "temp_dataset_" + jobUuid;
String queryTempTableId = "temp_table_" + jobUuid;
return new TableReference()
.setProjectId(projectId)
.setDatasetId(queryTempDatasetId)
.setTableId(queryTempTableId);
}
static String resolveTempLocation(
String tempLocationDir, String bigQueryOperationName, String stepUuid) {
return FileSystems.matchNewResource(tempLocationDir, true)
.resolve(bigQueryOperationName, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(stepUuid, ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY)
.toString();
}
}