| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigquery; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull; |
| |
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; |
| import com.google.api.client.googleapis.services.AbstractGoogleClientRequest; |
| import com.google.api.client.http.HttpRequestInitializer; |
| import com.google.api.client.util.BackOff; |
| import com.google.api.client.util.BackOffUtils; |
| import com.google.api.client.util.ExponentialBackOff; |
| import com.google.api.client.util.Sleeper; |
| import com.google.api.gax.core.FixedCredentialsProvider; |
| import com.google.api.gax.rpc.FixedHeaderProvider; |
| import com.google.api.gax.rpc.HeaderProvider; |
| import com.google.api.gax.rpc.ServerStream; |
| import com.google.api.services.bigquery.Bigquery; |
| import com.google.api.services.bigquery.Bigquery.Tables; |
| import com.google.api.services.bigquery.model.Dataset; |
| import com.google.api.services.bigquery.model.DatasetReference; |
| import com.google.api.services.bigquery.model.Job; |
| import com.google.api.services.bigquery.model.JobConfiguration; |
| import com.google.api.services.bigquery.model.JobConfigurationExtract; |
| import com.google.api.services.bigquery.model.JobConfigurationLoad; |
| import com.google.api.services.bigquery.model.JobConfigurationQuery; |
| import com.google.api.services.bigquery.model.JobConfigurationTableCopy; |
| import com.google.api.services.bigquery.model.JobReference; |
| import com.google.api.services.bigquery.model.JobStatistics; |
| import com.google.api.services.bigquery.model.JobStatus; |
| import com.google.api.services.bigquery.model.Table; |
| import com.google.api.services.bigquery.model.TableDataInsertAllRequest; |
| import com.google.api.services.bigquery.model.TableDataInsertAllResponse; |
| import com.google.api.services.bigquery.model.TableDataList; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.auth.Credentials; |
| import com.google.auth.http.HttpCredentialsAdapter; |
| import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient; |
| import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageSettings; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamRequest; |
| import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse; |
| import com.google.cloud.hadoop.util.ApiErrorExtractor; |
| import com.google.cloud.hadoop.util.ChainingHttpRequestInitializer; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.stream.Collectors; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.extensions.gcp.auth.NullCredentialInitializer; |
| import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; |
| import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter; |
| import org.apache.beam.sdk.extensions.gcp.util.CustomHttpErrors; |
| import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer; |
| import org.apache.beam.sdk.extensions.gcp.util.Transport; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.util.FluentBackoff; |
| import org.apache.beam.sdk.util.ReleaseInfo; |
| import org.apache.beam.sdk.values.ValueInSingleWindow; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.joda.time.Duration; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * An implementation of {@link BigQueryServices} that actually communicates with the cloud BigQuery |
| * service. |
| */ |
| class BigQueryServicesImpl implements BigQueryServices { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(BigQueryServicesImpl.class); |
| |
| // How frequently to log while polling. |
| private static final Duration POLLING_LOG_GAP = Duration.standardMinutes(10); |
| |
| // The maximum number of retries to execute a BigQuery RPC. |
| private static final int MAX_RPC_RETRIES = 9; |
| |
| // The initial backoff for executing a BigQuery RPC. |
| private static final Duration INITIAL_RPC_BACKOFF = Duration.standardSeconds(1); |
| |
| // The initial backoff for polling the status of a BigQuery job. |
| private static final Duration INITIAL_JOB_STATUS_POLL_BACKOFF = Duration.standardSeconds(1); |
| |
| private static final FluentBackoff DEFAULT_BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT.withMaxRetries(MAX_RPC_RETRIES).withInitialBackoff(INITIAL_RPC_BACKOFF); |
| |
| @Override |
| public JobService getJobService(BigQueryOptions options) { |
| return new JobServiceImpl(options); |
| } |
| |
| @Override |
| public DatasetService getDatasetService(BigQueryOptions options) { |
| return new DatasetServiceImpl(options); |
| } |
| |
| @Override |
| public StorageClient getStorageClient(BigQueryOptions options) throws IOException { |
| return new StorageClientImpl(options); |
| } |
| |
| private static BackOff createDefaultBackoff() { |
| return BackOffAdapter.toGcpBackOff(DEFAULT_BACKOFF_FACTORY.backoff()); |
| } |
| |
| @VisibleForTesting |
| static class JobServiceImpl implements BigQueryServices.JobService { |
| private final ApiErrorExtractor errorExtractor; |
| private final Bigquery client; |
| |
| @VisibleForTesting |
| JobServiceImpl(Bigquery client) { |
| this.errorExtractor = new ApiErrorExtractor(); |
| this.client = client; |
| } |
| |
| private JobServiceImpl(BigQueryOptions options) { |
| this.errorExtractor = new ApiErrorExtractor(); |
| this.client = newBigQueryClient(options).build(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void startLoadJob(JobReference jobRef, JobConfigurationLoad loadConfig) |
| throws InterruptedException, IOException { |
| Job job = |
| new Job() |
| .setJobReference(jobRef) |
| .setConfiguration(new JobConfiguration().setLoad(loadConfig)); |
| |
| startJob(job, errorExtractor, client); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void startExtractJob(JobReference jobRef, JobConfigurationExtract extractConfig) |
| throws InterruptedException, IOException { |
| Job job = |
| new Job() |
| .setJobReference(jobRef) |
| .setConfiguration(new JobConfiguration().setExtract(extractConfig)); |
| |
| startJob(job, errorExtractor, client); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void startQueryJob(JobReference jobRef, JobConfigurationQuery queryConfig) |
| throws IOException, InterruptedException { |
| Job job = |
| new Job() |
| .setJobReference(jobRef) |
| .setConfiguration(new JobConfiguration().setQuery(queryConfig)); |
| |
| startJob(job, errorExtractor, client); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void startCopyJob(JobReference jobRef, JobConfigurationTableCopy copyConfig) |
| throws IOException, InterruptedException { |
| Job job = |
| new Job() |
| .setJobReference(jobRef) |
| .setConfiguration(new JobConfiguration().setCopy(copyConfig)); |
| |
| startJob(job, errorExtractor, client); |
| } |
| |
| private static void startJob(Job job, ApiErrorExtractor errorExtractor, Bigquery client) |
| throws IOException, InterruptedException { |
| startJob(job, errorExtractor, client, Sleeper.DEFAULT, createDefaultBackoff()); |
| } |
| |
| @VisibleForTesting |
| static void startJob( |
| Job job, |
| ApiErrorExtractor errorExtractor, |
| Bigquery client, |
| Sleeper sleeper, |
| BackOff backoff) |
| throws IOException, InterruptedException { |
| JobReference jobRef = job.getJobReference(); |
| Exception lastException; |
| do { |
| try { |
| client.jobs().insert(jobRef.getProjectId(), job).execute(); |
| LOG.info( |
| "Started BigQuery job: {}.\n{}", |
| jobRef, |
| formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId())); |
| return; // SUCCEEDED |
| } catch (IOException e) { |
| if (errorExtractor.itemAlreadyExists(e)) { |
| LOG.info("BigQuery job " + jobRef + " already exists, will not retry inserting it:", e); |
| return; // SUCCEEDED |
| } |
| // ignore and retry |
| LOG.info("Failed to insert job " + jobRef + ", will retry:", e); |
| lastException = e; |
| } |
| } while (nextBackOff(sleeper, backoff)); |
| throw new IOException( |
| String.format( |
| "Unable to insert job: %s, aborting after %d .", jobRef.getJobId(), MAX_RPC_RETRIES), |
| lastException); |
| } |
| |
| @Override |
| public Job pollJob(JobReference jobRef, int maxAttempts) throws InterruptedException { |
| BackOff backoff = |
| BackOffAdapter.toGcpBackOff( |
| FluentBackoff.DEFAULT |
| .withMaxRetries(maxAttempts) |
| .withInitialBackoff(INITIAL_JOB_STATUS_POLL_BACKOFF) |
| .withMaxBackoff(Duration.standardMinutes(1)) |
| .backoff()); |
| return pollJob(jobRef, Sleeper.DEFAULT, backoff); |
| } |
| |
| @VisibleForTesting |
| Job pollJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) throws InterruptedException { |
| do { |
| try { |
| Job job = |
| client |
| .jobs() |
| .get(jobRef.getProjectId(), jobRef.getJobId()) |
| .setLocation(jobRef.getLocation()) |
| .execute(); |
| if (job == null) { |
| LOG.info("Still waiting for BigQuery job {} to start", jobRef); |
| continue; |
| } |
| JobStatus status = job.getStatus(); |
| if (status == null) { |
| LOG.info("Still waiting for BigQuery job {} to enter pending state", jobRef); |
| continue; |
| } |
| if ("DONE".equals(status.getState())) { |
| LOG.info("BigQuery job {} completed in state DONE", jobRef); |
| return job; |
| } |
| // The job is not DONE, wait longer and retry. |
| LOG.info( |
| "Still waiting for BigQuery job {}, currently in status {}\n{}", |
| jobRef.getJobId(), |
| status, |
| formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId())); |
| } catch (IOException e) { |
| // ignore and retry |
| LOG.info("Ignore the error and retry polling job status.", e); |
| } |
| } while (nextBackOff(sleeper, backoff)); |
| LOG.warn("Unable to poll job status: {}, aborting after reached max .", jobRef.getJobId()); |
| return null; |
| } |
| |
| private static String formatBqStatusCommand(String projectId, String jobId) { |
| return String.format("bq show -j --format=prettyjson --project_id=%s %s", projectId, jobId); |
| } |
| |
| @Override |
| public JobStatistics dryRunQuery( |
| String projectId, JobConfigurationQuery queryConfig, String location) |
| throws InterruptedException, IOException { |
| JobReference jobRef = new JobReference().setLocation(location).setProjectId(projectId); |
| Job job = |
| new Job() |
| .setJobReference(jobRef) |
| .setConfiguration(new JobConfiguration().setQuery(queryConfig).setDryRun(true)); |
| return executeWithRetries( |
| client.jobs().insert(projectId, job), |
| String.format( |
| "Unable to dry run query: %s, aborting after %d retries.", |
| queryConfig, MAX_RPC_RETRIES), |
| Sleeper.DEFAULT, |
| createDefaultBackoff(), |
| ALWAYS_RETRY) |
| .getStatistics(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Retries the RPC for at most {@code MAX_RPC_ATTEMPTS} times until it succeeds. |
| * |
| * @throws IOException if it exceeds max RPC retries. |
| */ |
| @Override |
| public Job getJob(JobReference jobRef) throws IOException, InterruptedException { |
| return getJob(jobRef, Sleeper.DEFAULT, createDefaultBackoff()); |
| } |
| |
| @VisibleForTesting |
| public Job getJob(JobReference jobRef, Sleeper sleeper, BackOff backoff) |
| throws IOException, InterruptedException { |
| String jobId = jobRef.getJobId(); |
| Exception lastException; |
| do { |
| try { |
| return client.jobs().get(jobRef.getProjectId(), jobId).execute(); |
| } catch (GoogleJsonResponseException e) { |
| if (errorExtractor.itemNotFound(e)) { |
| LOG.info("No BigQuery job with job id {} found.", jobId); |
| return null; |
| } |
| LOG.info( |
| "Ignoring the error encountered while trying to query the BigQuery job {}", jobId, e); |
| lastException = e; |
| } catch (IOException e) { |
| LOG.info( |
| "Ignoring the error encountered while trying to query the BigQuery job {}", jobId, e); |
| lastException = e; |
| } |
| } while (nextBackOff(sleeper, backoff)); |
| throw new IOException( |
| String.format( |
| "Unable to find BigQuery job: %s, aborting after %d retries.", |
| jobRef, MAX_RPC_RETRIES), |
| lastException); |
| } |
| } |
| |
| @VisibleForTesting |
| static class DatasetServiceImpl implements DatasetService { |
| private static final FluentBackoff INSERT_BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT.withInitialBackoff(Duration.millis(200)).withMaxRetries(5); |
| |
| // A backoff for rate limit exceeded errors. Retries forever. |
| private static final FluentBackoff RATE_LIMIT_BACKOFF_FACTORY = |
| FluentBackoff.DEFAULT |
| .withInitialBackoff(Duration.standardSeconds(1)) |
| .withMaxBackoff(Duration.standardMinutes(2)); |
| |
| private final ApiErrorExtractor errorExtractor; |
| private final Bigquery client; |
| private final PipelineOptions options; |
| private final long maxRowsPerBatch; |
| private final long maxRowBatchSize; |
| |
| private ExecutorService executor; |
| |
| @VisibleForTesting |
| DatasetServiceImpl(Bigquery client, PipelineOptions options) { |
| BigQueryOptions bqOptions = options.as(BigQueryOptions.class); |
| this.errorExtractor = new ApiErrorExtractor(); |
| this.client = client; |
| this.options = options; |
| this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch(); |
| this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); |
| this.executor = null; |
| } |
| |
| @VisibleForTesting |
| DatasetServiceImpl(Bigquery client, PipelineOptions options, long maxRowsPerBatch) { |
| BigQueryOptions bqOptions = options.as(BigQueryOptions.class); |
| this.errorExtractor = new ApiErrorExtractor(); |
| this.client = client; |
| this.options = options; |
| this.maxRowsPerBatch = maxRowsPerBatch; |
| this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); |
| this.executor = null; |
| } |
| |
| private DatasetServiceImpl(BigQueryOptions bqOptions) { |
| this.errorExtractor = new ApiErrorExtractor(); |
| this.client = newBigQueryClient(bqOptions).build(); |
| this.options = bqOptions; |
| this.maxRowsPerBatch = bqOptions.getMaxStreamingRowsToBatch(); |
| this.maxRowBatchSize = bqOptions.getMaxStreamingBatchSize(); |
| this.executor = null; |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| @Nullable |
| public Table getTable(TableReference tableRef) throws IOException, InterruptedException { |
| return getTable(tableRef, null); |
| } |
| |
| @Override |
| @Nullable |
| public Table getTable(TableReference tableRef, List<String> selectedFields) |
| throws IOException, InterruptedException { |
| return getTable(tableRef, selectedFields, createDefaultBackoff(), Sleeper.DEFAULT); |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| Table getTable( |
| TableReference ref, @Nullable List<String> selectedFields, BackOff backoff, Sleeper sleeper) |
| throws IOException, InterruptedException { |
| Tables.Get get = |
| client.tables().get(ref.getProjectId(), ref.getDatasetId(), ref.getTableId()); |
| if (selectedFields != null && !selectedFields.isEmpty()) { |
| get.setSelectedFields(String.join(",", selectedFields)); |
| } |
| try { |
| return executeWithRetries( |
| get, |
| String.format( |
| "Unable to get table: %s, aborting after %d retries.", |
| ref.getTableId(), MAX_RPC_RETRIES), |
| sleeper, |
| backoff, |
| DONT_RETRY_NOT_FOUND); |
| } catch (IOException e) { |
| if (errorExtractor.itemNotFound(e)) { |
| return null; |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * Retry table creation up to 5 minutes (with exponential backoff) when this user is near the |
| * quota for table creation. This relatively innocuous behavior can happen when BigQueryIO is |
| * configured with a table spec function to use different tables for each window. |
| */ |
| private static final int RETRY_CREATE_TABLE_DURATION_MILLIS = |
| (int) TimeUnit.MINUTES.toMillis(5); |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>If a table with the same name already exists in the dataset, the function simply returns. |
| * In such a case, the existing table doesn't necessarily have the same schema as specified by |
| * the parameter. |
| * |
| * @throws IOException if other error than already existing table occurs. |
| */ |
| @Override |
| public void createTable(Table table) throws InterruptedException, IOException { |
| LOG.info( |
| "Trying to create BigQuery table: {}", |
| BigQueryHelpers.toTableSpec(table.getTableReference())); |
| BackOff backoff = |
| new ExponentialBackOff.Builder() |
| .setMaxElapsedTimeMillis(RETRY_CREATE_TABLE_DURATION_MILLIS) |
| .build(); |
| |
| tryCreateTable(table, backoff, Sleeper.DEFAULT); |
| } |
| |
| @VisibleForTesting |
| @Nullable |
| Table tryCreateTable(Table table, BackOff backoff, Sleeper sleeper) throws IOException { |
| boolean retry = false; |
| while (true) { |
| try { |
| return client |
| .tables() |
| .insert( |
| table.getTableReference().getProjectId(), |
| table.getTableReference().getDatasetId(), |
| table) |
| .execute(); |
| } catch (IOException e) { |
| ApiErrorExtractor extractor = new ApiErrorExtractor(); |
| if (extractor.itemAlreadyExists(e)) { |
| // The table already exists, nothing to return. |
| return null; |
| } else if (extractor.rateLimited(e)) { |
| // The request failed because we hit a temporary quota. Back off and try again. |
| try { |
| if (BackOffUtils.next(sleeper, backoff)) { |
| if (!retry) { |
| LOG.info( |
| "Quota limit reached when creating table {}:{}.{}, retrying up to {} minutes", |
| table.getTableReference().getProjectId(), |
| table.getTableReference().getDatasetId(), |
| table.getTableReference().getTableId(), |
| TimeUnit.MILLISECONDS.toSeconds(RETRY_CREATE_TABLE_DURATION_MILLIS) / 60.0); |
| retry = true; |
| } |
| continue; |
| } |
| } catch (InterruptedException e1) { |
| // Restore interrupted state and throw the last failure. |
| Thread.currentThread().interrupt(); |
| throw e; |
| } |
| } |
| throw e; |
| } |
| } |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void deleteTable(TableReference tableRef) throws IOException, InterruptedException { |
| executeWithRetries( |
| client |
| .tables() |
| .delete(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), |
| String.format( |
| "Unable to delete table: %s, aborting after %d retries.", |
| tableRef.getTableId(), MAX_RPC_RETRIES), |
| Sleeper.DEFAULT, |
| createDefaultBackoff(), |
| ALWAYS_RETRY); |
| } |
| |
| @Override |
| public boolean isTableEmpty(TableReference tableRef) throws IOException, InterruptedException { |
| return isTableEmpty(tableRef, createDefaultBackoff(), Sleeper.DEFAULT); |
| } |
| |
| @VisibleForTesting |
| boolean isTableEmpty(TableReference tableRef, BackOff backoff, Sleeper sleeper) |
| throws IOException, InterruptedException { |
| TableDataList dataList = |
| executeWithRetries( |
| client |
| .tabledata() |
| .list(tableRef.getProjectId(), tableRef.getDatasetId(), tableRef.getTableId()), |
| String.format( |
| "Unable to list table data: %s, aborting after %d retries.", |
| tableRef.getTableId(), MAX_RPC_RETRIES), |
| sleeper, |
| backoff, |
| DONT_RETRY_NOT_FOUND); |
| return dataList.getRows() == null || dataList.getRows().isEmpty(); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public Dataset getDataset(String projectId, String datasetId) |
| throws IOException, InterruptedException { |
| return executeWithRetries( |
| client.datasets().get(projectId, datasetId), |
| String.format( |
| "Unable to get dataset: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), |
| Sleeper.DEFAULT, |
| createDefaultBackoff(), |
| DONT_RETRY_NOT_FOUND); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void createDataset( |
| String projectId, |
| String datasetId, |
| @Nullable String location, |
| @Nullable String description, |
| @Nullable Long defaultTableExpirationMs) |
| throws IOException, InterruptedException { |
| createDataset( |
| projectId, |
| datasetId, |
| location, |
| description, |
| defaultTableExpirationMs, |
| Sleeper.DEFAULT, |
| createDefaultBackoff()); |
| } |
| |
| private void createDataset( |
| String projectId, |
| String datasetId, |
| @Nullable String location, |
| @Nullable String description, |
| @Nullable Long defaultTableExpirationMs, |
| Sleeper sleeper, |
| BackOff backoff) |
| throws IOException, InterruptedException { |
| DatasetReference datasetRef = |
| new DatasetReference().setProjectId(projectId).setDatasetId(datasetId); |
| |
| Dataset dataset = new Dataset().setDatasetReference(datasetRef); |
| if (location != null) { |
| dataset.setLocation(location); |
| } |
| if (description != null) { |
| dataset.setFriendlyName(description); |
| dataset.setDescription(description); |
| } |
| if (defaultTableExpirationMs != null) { |
| dataset.setDefaultTableExpirationMs(defaultTableExpirationMs); |
| } |
| |
| Exception lastException; |
| do { |
| try { |
| client.datasets().insert(projectId, dataset).execute(); |
| return; // SUCCEEDED |
| } catch (GoogleJsonResponseException e) { |
| if (errorExtractor.itemAlreadyExists(e)) { |
| return; // SUCCEEDED |
| } |
| // ignore and retry |
| LOG.info("Ignore the error and retry creating the dataset.", e); |
| lastException = e; |
| } catch (IOException e) { |
| LOG.info("Ignore the error and retry creating the dataset.", e); |
| lastException = e; |
| } |
| } while (nextBackOff(sleeper, backoff)); |
| throw new IOException( |
| String.format( |
| "Unable to create dataset: %s, aborting after %d .", datasetId, MAX_RPC_RETRIES), |
| lastException); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * <p>Tries executing the RPC for at most {@code MAX_RPC_RETRIES} times until it succeeds. |
| * |
| * @throws IOException if it exceeds {@code MAX_RPC_RETRIES} attempts. |
| */ |
| @Override |
| public void deleteDataset(String projectId, String datasetId) |
| throws IOException, InterruptedException { |
| executeWithRetries( |
| client.datasets().delete(projectId, datasetId), |
| String.format( |
| "Unable to delete table: %s, aborting after %d retries.", datasetId, MAX_RPC_RETRIES), |
| Sleeper.DEFAULT, |
| createDefaultBackoff(), |
| ALWAYS_RETRY); |
| } |
| |
| @VisibleForTesting |
| <T> long insertAll( |
| TableReference ref, |
| List<ValueInSingleWindow<TableRow>> rowList, |
| @Nullable List<String> insertIdList, |
| BackOff backoff, |
| final Sleeper sleeper, |
| InsertRetryPolicy retryPolicy, |
| List<ValueInSingleWindow<T>> failedInserts, |
| ErrorContainer<T> errorContainer, |
| boolean skipInvalidRows, |
| boolean ignoreUnkownValues) |
| throws IOException, InterruptedException { |
| checkNotNull(ref, "ref"); |
| if (executor == null) { |
| this.executor = |
| new BoundedExecutorService( |
| options.as(GcsOptions.class).getExecutorService(), |
| options.as(BigQueryOptions.class).getInsertBundleParallelism()); |
| } |
| if (insertIdList != null && rowList.size() != insertIdList.size()) { |
| throw new AssertionError( |
| "If insertIdList is not null it needs to have at least " |
| + "as many elements as rowList"); |
| } |
| |
| long retTotalDataSize = 0; |
| List<TableDataInsertAllResponse.InsertErrors> allErrors = new ArrayList<>(); |
| // These lists contain the rows to publish. Initially the contain the entire list. |
| // If there are failures, they will contain only the failed rows to be retried. |
| List<ValueInSingleWindow<TableRow>> rowsToPublish = rowList; |
| List<String> idsToPublish = insertIdList; |
| while (true) { |
| List<ValueInSingleWindow<TableRow>> retryRows = new ArrayList<>(); |
| List<String> retryIds = (idsToPublish != null) ? new ArrayList<>() : null; |
| |
| int strideIndex = 0; |
| // Upload in batches. |
| List<TableDataInsertAllRequest.Rows> rows = new ArrayList<>(); |
| int dataSize = 0; |
| |
| List<Future<List<TableDataInsertAllResponse.InsertErrors>>> futures = new ArrayList<>(); |
| List<Integer> strideIndices = new ArrayList<>(); |
| |
| for (int i = 0; i < rowsToPublish.size(); ++i) { |
| TableRow row = rowsToPublish.get(i).getValue(); |
| TableDataInsertAllRequest.Rows out = new TableDataInsertAllRequest.Rows(); |
| if (idsToPublish != null) { |
| out.setInsertId(idsToPublish.get(i)); |
| } |
| out.setJson(row.getUnknownKeys()); |
| rows.add(out); |
| |
| dataSize += row.toString().length(); |
| if (dataSize >= maxRowBatchSize |
| || rows.size() >= maxRowsPerBatch |
| || i == rowsToPublish.size() - 1) { |
| TableDataInsertAllRequest content = new TableDataInsertAllRequest(); |
| content.setRows(rows); |
| content.setSkipInvalidRows(skipInvalidRows); |
| content.setIgnoreUnknownValues(ignoreUnkownValues); |
| |
| final Bigquery.Tabledata.InsertAll insert = |
| client |
| .tabledata() |
| .insertAll(ref.getProjectId(), ref.getDatasetId(), ref.getTableId(), content); |
| |
| futures.add( |
| executor.submit( |
| () -> { |
| // A backoff for rate limit exceeded errors. Retries forever. |
| BackOff backoff1 = |
| BackOffAdapter.toGcpBackOff(RATE_LIMIT_BACKOFF_FACTORY.backoff()); |
| while (true) { |
| try { |
| return insert.execute().getInsertErrors(); |
| } catch (IOException e) { |
| LOG.info( |
| String.format( |
| "BigQuery insertAll error, retrying: %s", |
| ApiErrorExtractor.INSTANCE.getErrorMessage(e))); |
| try { |
| sleeper.sleep(backoff1.nextBackOffMillis()); |
| } catch (InterruptedException interrupted) { |
| throw new IOException( |
| "Interrupted while waiting before retrying insertAll"); |
| } |
| } |
| } |
| })); |
| strideIndices.add(strideIndex); |
| |
| retTotalDataSize += dataSize; |
| |
| dataSize = 0; |
| strideIndex = i + 1; |
| rows = new ArrayList<>(); |
| } |
| } |
| |
| try { |
| for (int i = 0; i < futures.size(); i++) { |
| List<TableDataInsertAllResponse.InsertErrors> errors = futures.get(i).get(); |
| if (errors == null) { |
| continue; |
| } |
| for (TableDataInsertAllResponse.InsertErrors error : errors) { |
| if (error.getIndex() == null) { |
| throw new IOException("Insert failed: " + error + ", other errors: " + allErrors); |
| } |
| |
| int errorIndex = error.getIndex().intValue() + strideIndices.get(i); |
| if (retryPolicy.shouldRetry(new InsertRetryPolicy.Context(error))) { |
| allErrors.add(error); |
| retryRows.add(rowsToPublish.get(errorIndex)); |
| if (retryIds != null) { |
| retryIds.add(idsToPublish.get(errorIndex)); |
| } |
| } else { |
| errorContainer.add(failedInserts, error, ref, rowsToPublish.get(errorIndex)); |
| } |
| } |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while inserting " + rowsToPublish); |
| } catch (ExecutionException e) { |
| throw new RuntimeException(e.getCause()); |
| } |
| |
| if (allErrors.isEmpty()) { |
| break; |
| } |
| long nextBackoffMillis = backoff.nextBackOffMillis(); |
| if (nextBackoffMillis == BackOff.STOP) { |
| break; |
| } |
| try { |
| sleeper.sleep(nextBackoffMillis); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| throw new IOException("Interrupted while waiting before retrying insert of " + retryRows); |
| } |
| rowsToPublish = retryRows; |
| idsToPublish = retryIds; |
| allErrors.clear(); |
| LOG.info("Retrying {} failed inserts to BigQuery", rowsToPublish.size()); |
| } |
| if (!allErrors.isEmpty()) { |
| throw new IOException("Insert failed: " + allErrors); |
| } else { |
| return retTotalDataSize; |
| } |
| } |
| |
| @Override |
| public <T> long insertAll( |
| TableReference ref, |
| List<ValueInSingleWindow<TableRow>> rowList, |
| @Nullable List<String> insertIdList, |
| InsertRetryPolicy retryPolicy, |
| List<ValueInSingleWindow<T>> failedInserts, |
| ErrorContainer<T> errorContainer, |
| boolean skipInvalidRows, |
| boolean ignoreUnknownValues) |
| throws IOException, InterruptedException { |
| return insertAll( |
| ref, |
| rowList, |
| insertIdList, |
| BackOffAdapter.toGcpBackOff(INSERT_BACKOFF_FACTORY.backoff()), |
| Sleeper.DEFAULT, |
| retryPolicy, |
| failedInserts, |
| errorContainer, |
| skipInvalidRows, |
| ignoreUnknownValues); |
| } |
| |
| @Override |
| public Table patchTableDescription( |
| TableReference tableReference, @Nullable String tableDescription) |
| throws IOException, InterruptedException { |
| Table table = new Table(); |
| table.setDescription(tableDescription); |
| |
| return executeWithRetries( |
| client |
| .tables() |
| .patch( |
| tableReference.getProjectId(), |
| tableReference.getDatasetId(), |
| tableReference.getTableId(), |
| table), |
| String.format( |
| "Unable to patch table description: %s, aborting after %d retries.", |
| tableReference, MAX_RPC_RETRIES), |
| Sleeper.DEFAULT, |
| createDefaultBackoff(), |
| ALWAYS_RETRY); |
| } |
| } |
| |
| static final SerializableFunction<IOException, Boolean> DONT_RETRY_NOT_FOUND = |
| input -> { |
| ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); |
| return !errorExtractor.itemNotFound(input); |
| }; |
| |
| static final SerializableFunction<IOException, Boolean> ALWAYS_RETRY = input -> true; |
| |
| @VisibleForTesting |
| static <T> T executeWithRetries( |
| AbstractGoogleClientRequest<T> request, |
| String errorMessage, |
| Sleeper sleeper, |
| BackOff backoff, |
| SerializableFunction<IOException, Boolean> shouldRetry) |
| throws IOException, InterruptedException { |
| Exception lastException = null; |
| do { |
| try { |
| return request.execute(); |
| } catch (IOException e) { |
| lastException = e; |
| if (!shouldRetry.apply(e)) { |
| break; |
| } |
| LOG.info("Ignore the error and retry the request.", e); |
| } |
| } while (nextBackOff(sleeper, backoff)); |
| throw new IOException(errorMessage, lastException); |
| } |
| |
| /** Identical to {@link BackOffUtils#next} but without checked IOException. */ |
| private static boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { |
| try { |
| return BackOffUtils.next(sleeper, backoff); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| |
| /** Returns a BigQuery client builder using the specified {@link BigQueryOptions}. */ |
| private static Bigquery.Builder newBigQueryClient(BigQueryOptions options) { |
| RetryHttpRequestInitializer httpRequestInitializer = |
| new RetryHttpRequestInitializer(ImmutableList.of(404)); |
| httpRequestInitializer.setCustomErrors(createBigQueryClientCustomErrors()); |
| httpRequestInitializer.setWriteTimeout(options.getHTTPWriteTimeout()); |
| return new Bigquery.Builder( |
| Transport.getTransport(), |
| Transport.getJsonFactory(), |
| chainHttpRequestInitializer( |
| options.getGcpCredential(), |
| // Do not log 404. It clutters the output and is possibly even required by the |
| // caller. |
| httpRequestInitializer)) |
| .setApplicationName(options.getAppName()) |
| .setGoogleClientRequestInitializer(options.getGoogleApiTrace()); |
| } |
| |
| private static HttpRequestInitializer chainHttpRequestInitializer( |
| Credentials credential, HttpRequestInitializer httpRequestInitializer) { |
| if (credential == null) { |
| return new ChainingHttpRequestInitializer( |
| new NullCredentialInitializer(), httpRequestInitializer); |
| } else { |
| return new ChainingHttpRequestInitializer( |
| new HttpCredentialsAdapter(credential), httpRequestInitializer); |
| } |
| } |
| |
| public static CustomHttpErrors createBigQueryClientCustomErrors() { |
| CustomHttpErrors.Builder builder = new CustomHttpErrors.Builder(); |
| // 403 errors, to list tables, matching this URL: |
| // http://www.googleapis.com/bigquery/v2/projects/myproject/datasets/ |
| // mydataset/tables?maxResults=1000 |
| builder.addErrorForCodeAndUrlContains( |
| 403, |
| "/tables?", |
| "The GCP project is most likely exceeding the rate limit on " |
| + "bigquery.tables.list, please find the instructions to increase this limit at: " |
| + "https://cloud.google.com/service-infrastructure/docs/rate-limiting#configure"); |
| return builder.build(); |
| } |
| |
| static class BigQueryServerStreamImpl<T> implements BigQueryServerStream<T> { |
| |
| private final ServerStream serverStream; |
| |
| public BigQueryServerStreamImpl(ServerStream serverStream) { |
| this.serverStream = serverStream; |
| } |
| |
| @Override |
| public Iterator<T> iterator() { |
| return serverStream.iterator(); |
| } |
| |
| @Override |
| public void cancel() { |
| serverStream.cancel(); |
| } |
| } |
| |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| static class StorageClientImpl implements StorageClient { |
| |
| private static final HeaderProvider USER_AGENT_HEADER_PROVIDER = |
| FixedHeaderProvider.create( |
| "user-agent", "Apache_Beam_Java/" + ReleaseInfo.getReleaseInfo().getVersion()); |
| |
| private final BigQueryStorageClient client; |
| |
| private StorageClientImpl(BigQueryOptions options) throws IOException { |
| BigQueryStorageSettings settings = |
| BigQueryStorageSettings.newBuilder() |
| .setCredentialsProvider(FixedCredentialsProvider.create(options.getGcpCredential())) |
| .setTransportChannelProvider( |
| BigQueryStorageSettings.defaultGrpcTransportProviderBuilder() |
| .setHeaderProvider(USER_AGENT_HEADER_PROVIDER) |
| .build()) |
| .build(); |
| this.client = BigQueryStorageClient.create(settings); |
| } |
| |
| @Override |
| public ReadSession createReadSession(CreateReadSessionRequest request) { |
| return client.createReadSession(request); |
| } |
| |
| @Override |
| public BigQueryServerStream<ReadRowsResponse> readRows(ReadRowsRequest request) { |
| return new BigQueryServerStreamImpl(client.readRowsCallable().call(request)); |
| } |
| |
| @Override |
| public SplitReadStreamResponse splitReadStream(SplitReadStreamRequest request) { |
| return client.splitReadStream(request); |
| } |
| |
| @Override |
| public void close() { |
| client.close(); |
| } |
| } |
| |
| private static class BoundedExecutorService implements ExecutorService { |
| private final ExecutorService executor; |
| private final Semaphore semaphore; |
| private final int parallelism; |
| |
| BoundedExecutorService(ExecutorService executor, int parallelism) { |
| this.executor = executor; |
| this.parallelism = parallelism; |
| this.semaphore = new Semaphore(parallelism); |
| } |
| |
| @Override |
| public void shutdown() { |
| executor.shutdown(); |
| } |
| |
| @Override |
| public List<Runnable> shutdownNow() { |
| List<Runnable> runnables = executor.shutdownNow(); |
| // try to release permits as many as possible before returning semaphored runnables. |
| synchronized (this) { |
| if (semaphore.availablePermits() <= parallelism) { |
| semaphore.release(Integer.MAX_VALUE - parallelism); |
| } |
| } |
| return runnables; |
| } |
| |
| @Override |
| public boolean isShutdown() { |
| return executor.isShutdown(); |
| } |
| |
| @Override |
| public boolean isTerminated() { |
| return executor.isTerminated(); |
| } |
| |
| @Override |
| public boolean awaitTermination(long l, TimeUnit timeUnit) throws InterruptedException { |
| return executor.awaitTermination(l, timeUnit); |
| } |
| |
| @Override |
| public <T> Future<T> submit(Callable<T> callable) { |
| return executor.submit(new SemaphoreCallable<>(callable)); |
| } |
| |
| @Override |
| public <T> Future<T> submit(Runnable runnable, T t) { |
| return executor.submit(new SemaphoreRunnable(runnable), t); |
| } |
| |
| @Override |
| public Future<?> submit(Runnable runnable) { |
| return executor.submit(new SemaphoreRunnable(runnable)); |
| } |
| |
| @Override |
| public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> collection) |
| throws InterruptedException { |
| return executor.invokeAll( |
| collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList())); |
| } |
| |
| @Override |
| public <T> List<Future<T>> invokeAll( |
| Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit) |
| throws InterruptedException { |
| return executor.invokeAll( |
| collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()), |
| l, |
| timeUnit); |
| } |
| |
| @Override |
| public <T> T invokeAny(Collection<? extends Callable<T>> collection) |
| throws InterruptedException, ExecutionException { |
| return executor.invokeAny( |
| collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList())); |
| } |
| |
| @Override |
| public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, TimeUnit timeUnit) |
| throws InterruptedException, ExecutionException, TimeoutException { |
| return executor.invokeAny( |
| collection.stream().map(SemaphoreCallable::new).collect(Collectors.toList()), |
| l, |
| timeUnit); |
| } |
| |
| @Override |
| public void execute(Runnable runnable) { |
| executor.execute(new SemaphoreRunnable(runnable)); |
| } |
| |
| private class SemaphoreRunnable implements Runnable { |
| private final Runnable runnable; |
| |
| SemaphoreRunnable(Runnable runnable) { |
| this.runnable = runnable; |
| } |
| |
| @Override |
| public void run() { |
| try { |
| semaphore.acquire(); |
| } catch (InterruptedException e) { |
| throw new RuntimeException("semaphore acquisition interrupted. task canceled."); |
| } |
| try { |
| runnable.run(); |
| } finally { |
| semaphore.release(); |
| } |
| } |
| } |
| |
| private class SemaphoreCallable<V> implements Callable<V> { |
| private final Callable<V> callable; |
| |
| SemaphoreCallable(Callable<V> callable) { |
| this.callable = callable; |
| } |
| |
| @Override |
| public V call() throws Exception { |
| semaphore.acquire(); |
| try { |
| return callable.call(); |
| } finally { |
| semaphore.release(); |
| } |
| } |
| } |
| } |
| } |