blob: 14c9bf379508544c395b77e50beb38b8a332cbb7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.FileSystems.match;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.getExtractJobId;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.resolveTempLocation;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationExtract;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.AvroSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.Status;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.JobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An abstract {@link BoundedSource} to read a table from BigQuery.
*
* <p>This source uses a BigQuery export job to take a snapshot of the table on GCS, and then reads
* in parallel from each produced file. It is implemented by {@link BigQueryTableSource}, and {@link
* BigQueryQuerySource}, depending on the configuration of the read. Specifically,
*
* <ul>
* <li>{@link BigQueryTableSource} is for reading BigQuery tables
* <li>{@link BigQueryQuerySource} is for querying BigQuery tables
* </ul>
*
* ...
*/
abstract class BigQuerySourceBase<T> extends BoundedSource<T> {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySourceBase.class);
// The maximum number of retries to poll a BigQuery job.
protected static final int JOB_POLL_MAX_RETRIES = Integer.MAX_VALUE;
protected final String stepUuid;
protected final BigQueryServices bqServices;
private transient List<BoundedSource<T>> cachedSplitResult;
private SerializableFunction<SchemaAndRecord, T> parseFn;
private Coder<T> coder;
BigQuerySourceBase(
String stepUuid,
BigQueryServices bqServices,
Coder<T> coder,
SerializableFunction<SchemaAndRecord, T> parseFn) {
this.stepUuid = checkNotNull(stepUuid, "stepUuid");
this.bqServices = checkNotNull(bqServices, "bqServices");
this.coder = checkNotNull(coder, "coder");
this.parseFn = checkNotNull(parseFn, "parseFn");
}
protected static class ExtractResult {
public final TableSchema schema;
public final List<ResourceId> extractedFiles;
public List<MatchResult.Metadata> metadata = null;
public ExtractResult(TableSchema schema, List<ResourceId> extractedFiles) {
this(schema, extractedFiles, null);
}
public ExtractResult(
TableSchema schema, List<ResourceId> extractedFiles, List<MatchResult.Metadata> metadata) {
this.schema = schema;
this.extractedFiles = extractedFiles;
this.metadata = metadata;
}
}
protected ExtractResult extractFiles(PipelineOptions options) throws Exception {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
TableReference tableToExtract = getTableToExtract(bqOptions);
BigQueryServices.DatasetService datasetService = bqServices.getDatasetService(bqOptions);
Table table = datasetService.getTable(tableToExtract);
if (table == null) {
throw new IOException(
String.format(
"Cannot start an export job since table %s does not exist",
BigQueryHelpers.toTableSpec(tableToExtract)));
}
TableSchema schema = table.getSchema();
JobService jobService = bqServices.getJobService(bqOptions);
String extractJobId = getExtractJobId(createJobIdToken(options.getJobName(), stepUuid));
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
String bqLocation =
BigQueryHelpers.getDatasetLocation(
datasetService, tableToExtract.getProjectId(), tableToExtract.getDatasetId());
List<ResourceId> tempFiles =
executeExtract(
extractJobId,
tableToExtract,
jobService,
bqOptions.getProject(),
extractDestinationDir,
bqLocation);
return new ExtractResult(schema, tempFiles);
}
@Override
public List<BoundedSource<T>> split(long desiredBundleSizeBytes, PipelineOptions options)
throws Exception {
// split() can be called multiple times, e.g. Dataflow runner may call it multiple times
// with different desiredBundleSizeBytes in case the split() call produces too many sources.
// We ignore desiredBundleSizeBytes anyway, however in any case, we should not initiate
// another BigQuery extract job for the repeated split() calls.
if (cachedSplitResult == null) {
ExtractResult res = extractFiles(options);
LOG.info("Extract job produced {} files", res.extractedFiles.size());
if (res.extractedFiles.size() > 0) {
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
final String extractDestinationDir =
resolveTempLocation(bqOptions.getTempLocation(), "BigQueryExtractTemp", stepUuid);
// Match all files in the destination directory to stat them in bulk.
List<MatchResult> matches = match(ImmutableList.of(extractDestinationDir + "*"));
if (matches.size() > 0) {
res.metadata = matches.get(0).metadata();
}
}
cleanupTempResource(options.as(BigQueryOptions.class));
cachedSplitResult = checkNotNull(createSources(res.extractedFiles, res.schema, res.metadata));
}
return cachedSplitResult;
}
protected abstract TableReference getTableToExtract(BigQueryOptions bqOptions) throws Exception;
protected abstract void cleanupTempResource(BigQueryOptions bqOptions) throws Exception;
@Override
public BoundedReader<T> createReader(PipelineOptions options) throws IOException {
throw new UnsupportedOperationException("BigQuery source must be split before being read");
}
@Override
public void validate() {
// Do nothing, validation is done in BigQuery.Read.
}
@Override
public Coder<T> getOutputCoder() {
return coder;
}
private List<ResourceId> executeExtract(
String jobId,
TableReference table,
JobService jobService,
String executingProject,
String extractDestinationDir,
String bqLocation)
throws InterruptedException, IOException {
JobReference jobRef =
new JobReference().setProjectId(executingProject).setLocation(bqLocation).setJobId(jobId);
String destinationUri = BigQueryIO.getExtractDestinationUri(extractDestinationDir);
JobConfigurationExtract extract =
new JobConfigurationExtract()
.setSourceTable(table)
.setDestinationFormat("AVRO")
.setDestinationUris(ImmutableList.of(destinationUri));
LOG.info("Starting BigQuery extract job: {}", jobId);
jobService.startExtractJob(jobRef, extract);
Job extractJob = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES);
if (BigQueryHelpers.parseStatus(extractJob) != Status.SUCCEEDED) {
throw new IOException(
String.format(
"Extract job %s failed, status: %s.",
extractJob.getJobReference().getJobId(),
BigQueryHelpers.statusToPrettyString(extractJob.getStatus())));
}
LOG.info("BigQuery extract job completed: {}", jobId);
return BigQueryIO.getExtractFilePaths(extractDestinationDir, extractJob);
}
private static class TableSchemaFunction implements Serializable, Function<String, TableSchema> {
@Nullable
@Override
public TableSchema apply(@Nullable String input) {
return BigQueryHelpers.fromJsonString(input, TableSchema.class);
}
}
List<BoundedSource<T>> createSources(
List<ResourceId> files, TableSchema schema, List<MatchResult.Metadata> metadata)
throws IOException, InterruptedException {
final String jsonSchema = BigQueryIO.JSON_FACTORY.toString(schema);
SerializableFunction<GenericRecord, T> fnWrapper =
new SerializableFunction<GenericRecord, T>() {
private Supplier<TableSchema> schema =
Suppliers.memoize(
Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema)));
@Override
public T apply(GenericRecord input) {
return parseFn.apply(new SchemaAndRecord(input, schema.get()));
}
};
List<BoundedSource<T>> avroSources = Lists.newArrayList();
// If metadata is available, create AvroSources with said metadata in SINGLE_FILE_OR_SUBRANGE
// mode.
if (metadata != null) {
for (MatchResult.Metadata file : metadata) {
avroSources.add(AvroSource.from(file).withParseFn(fnWrapper, getOutputCoder()));
}
} else {
for (ResourceId file : files) {
avroSources.add(AvroSource.from(file.toString()).withParseFn(fnWrapper, getOutputCoder()));
}
}
return ImmutableList.copyOf(avroSources);
}
}