blob: b58d18d5471210a8e08178e0bfaf17cb052feb58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.extensions.gcp.util.BackOffAdapter;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.JsonTableRefToTableSpec;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Contains some useful helper instances of {@link DynamicDestinations}. */
class DynamicDestinationsHelpers {
private static final Logger LOG = LoggerFactory.getLogger(DynamicDestinationsHelpers.class);
/** Always returns a constant table destination. */
static class ConstantTableDestinations<T> extends DynamicDestinations<T, TableDestination> {
private final ValueProvider<String> tableSpec;
@Nullable private final String tableDescription;
ConstantTableDestinations(ValueProvider<String> tableSpec, @Nullable String tableDescription) {
this.tableSpec = tableSpec;
this.tableDescription = tableDescription;
}
static <T> ConstantTableDestinations<T> fromTableSpec(
ValueProvider<String> tableSpec, String tableDescription) {
return new ConstantTableDestinations<>(tableSpec, tableDescription);
}
static <T> ConstantTableDestinations<T> fromJsonTableRef(
ValueProvider<String> jsonTableRef, String tableDescription) {
return new ConstantTableDestinations<>(
NestedValueProvider.of(jsonTableRef, new JsonTableRefToTableSpec()), tableDescription);
}
@Override
public TableDestination getDestination(ValueInSingleWindow<T> element) {
String tableSpec = this.tableSpec.get();
checkArgument(tableSpec != null, "tableSpec can not be null");
return new TableDestination(tableSpec, tableDescription);
}
@Override
public TableSchema getSchema(TableDestination destination) {
return null;
}
@Override
public TableDestination getTable(TableDestination destination) {
return destination;
}
@Override
public Coder<TableDestination> getDestinationCoder() {
return TableDestinationCoderV2.of();
}
}
/** Returns tables based on a user-supplied function. */
static class TableFunctionDestinations<T> extends DynamicDestinations<T, TableDestination> {
private final SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction;
private final boolean clusteringEnabled;
TableFunctionDestinations(
SerializableFunction<ValueInSingleWindow<T>, TableDestination> tableFunction,
boolean clusteringEnabled) {
this.tableFunction = tableFunction;
this.clusteringEnabled = clusteringEnabled;
}
@Override
public TableDestination getDestination(ValueInSingleWindow<T> element) {
TableDestination res = tableFunction.apply(element);
checkArgument(
res != null,
"result of tableFunction can not be null, but %s returned null for element: %s",
tableFunction,
element);
return res;
}
@Override
public TableSchema getSchema(TableDestination destination) {
return null;
}
@Override
public TableDestination getTable(TableDestination destination) {
return destination;
}
@Override
public Coder<TableDestination> getDestinationCoder() {
if (clusteringEnabled) {
return TableDestinationCoderV3.of();
} else {
return TableDestinationCoderV2.of();
}
}
}
/**
* Delegates all calls to an inner instance of {@link DynamicDestinations}. This allows subclasses
* to modify another instance of {@link DynamicDestinations} by subclassing and overriding just
* the methods they want to alter.
*/
static class DelegatingDynamicDestinations<T, DestinationT>
extends DynamicDestinations<T, DestinationT> {
final DynamicDestinations<T, DestinationT> inner;
DelegatingDynamicDestinations(DynamicDestinations<T, DestinationT> inner) {
this.inner = inner;
}
@Override
public DestinationT getDestination(ValueInSingleWindow<T> element) {
return inner.getDestination(element);
}
@Override
public TableSchema getSchema(DestinationT destination) {
return inner.getSchema(destination);
}
@Override
public TableDestination getTable(DestinationT destination) {
return inner.getTable(destination);
}
@Override
public Coder<DestinationT> getDestinationCoder() {
return inner.getDestinationCoder();
}
@Override
Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
throws CannotProvideCoderException {
Coder<DestinationT> destinationCoder = getDestinationCoder();
if (destinationCoder != null) {
return destinationCoder;
}
return inner.getDestinationCoderWithDefault(registry);
}
@Override
public List<PCollectionView<?>> getSideInputs() {
return inner.getSideInputs();
}
@Override
void setSideInputAccessorFromProcessContext(DoFn<?, ?>.ProcessContext context) {
super.setSideInputAccessorFromProcessContext(context);
inner.setSideInputAccessorFromProcessContext(context);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("inner", inner).toString();
}
}
/** Returns the same schema for every table. */
static class ConstantSchemaDestinations<T, DestinationT>
extends DelegatingDynamicDestinations<T, DestinationT> {
@Nullable private final ValueProvider<String> jsonSchema;
ConstantSchemaDestinations(
DynamicDestinations<T, DestinationT> inner, ValueProvider<String> jsonSchema) {
super(inner);
checkArgument(jsonSchema != null, "jsonSchema can not be null");
this.jsonSchema = jsonSchema;
}
@Override
public TableSchema getSchema(DestinationT destination) {
String jsonSchema = this.jsonSchema.get();
checkArgument(jsonSchema != null, "jsonSchema can not be null");
return BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("inner", inner)
.add("jsonSchema", jsonSchema)
.toString();
}
}
static class ConstantTimePartitioningDestinations<T>
extends DelegatingDynamicDestinations<T, TableDestination> {
@Nullable private final ValueProvider<String> jsonTimePartitioning;
@Nullable private final ValueProvider<String> jsonClustering;
ConstantTimePartitioningDestinations(
DynamicDestinations<T, TableDestination> inner,
ValueProvider<String> jsonTimePartitioning,
ValueProvider<String> jsonClustering) {
super(inner);
checkArgument(jsonTimePartitioning != null, "jsonTimePartitioning provider can not be null");
if (jsonTimePartitioning.isAccessible()) {
checkArgument(jsonTimePartitioning.get() != null, "jsonTimePartitioning can not be null");
}
this.jsonTimePartitioning = jsonTimePartitioning;
this.jsonClustering = jsonClustering;
}
@Override
public TableDestination getDestination(ValueInSingleWindow<T> element) {
TableDestination destination = super.getDestination(element);
String partitioning = this.jsonTimePartitioning.get();
checkArgument(partitioning != null, "jsonTimePartitioning can not be null");
return new TableDestination(
destination.getTableSpec(),
destination.getTableDescription(),
partitioning,
Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null));
}
@Override
public Coder<TableDestination> getDestinationCoder() {
if (jsonClustering != null) {
return TableDestinationCoderV3.of();
} else {
return TableDestinationCoderV2.of();
}
}
@Override
public String toString() {
MoreObjects.ToStringHelper helper =
MoreObjects.toStringHelper(this)
.add("inner", inner)
.add("jsonTimePartitioning", jsonTimePartitioning);
if (jsonClustering != null) {
helper.add("jsonClustering", jsonClustering);
}
return helper.toString();
}
}
/**
* Takes in a side input mapping tablespec to json table schema, and always returns the matching
* schema from the side input.
*/
static class SchemaFromViewDestinations<T>
extends DelegatingDynamicDestinations<T, TableDestination> {
PCollectionView<Map<String, String>> schemaView;
SchemaFromViewDestinations(
DynamicDestinations<T, TableDestination> inner,
PCollectionView<Map<String, String>> schemaView) {
super(inner);
checkArgument(schemaView != null, "schemaView can not be null");
this.schemaView = schemaView;
}
@Override
public List<PCollectionView<?>> getSideInputs() {
return ImmutableList.<PCollectionView<?>>builder().add(schemaView).build();
}
@Override
public TableSchema getSchema(TableDestination destination) {
Map<String, String> mapValue = sideInput(schemaView);
String schema = mapValue.get(destination.getTableSpec());
checkArgument(
schema != null,
"Schema view must contain data for every destination used, "
+ "but view %s does not contain data for table destination %s "
+ "produced by %s",
schemaView,
destination.getTableSpec(),
inner);
return BigQueryHelpers.fromJsonString(schema, TableSchema.class);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("inner", inner)
.add("schemaView", schemaView)
.toString();
}
}
static <T, DestinationT> DynamicDestinations<T, DestinationT> matchTableDynamicDestinations(
DynamicDestinations<T, DestinationT> inner, BigQueryServices bqServices) {
return new MatchTableDynamicDestinations<>(inner, bqServices);
}
static class MatchTableDynamicDestinations<T, DestinationT>
extends DelegatingDynamicDestinations<T, DestinationT> {
private final BigQueryServices bqServices;
private MatchTableDynamicDestinations(
DynamicDestinations<T, DestinationT> inner, BigQueryServices bqServices) {
super(inner);
this.bqServices = bqServices;
}
private Table getBigQueryTable(TableReference tableReference) {
BackOff backoff =
BackOffAdapter.toGcpBackOff(
FluentBackoff.DEFAULT
.withMaxRetries(3)
.withInitialBackoff(Duration.standardSeconds(1))
.withMaxBackoff(Duration.standardSeconds(2))
.backoff());
try {
do {
try {
BigQueryOptions bqOptions = getPipelineOptions().as(BigQueryOptions.class);
return bqServices.getDatasetService(bqOptions).getTable(tableReference);
} catch (InterruptedException | IOException e) {
LOG.info("Failed to get BigQuery table " + tableReference);
}
} while (nextBackOff(Sleeper.DEFAULT, backoff));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return null;
}
/** Identical to {@link BackOffUtils#next} but without checked IOException. */
private static boolean nextBackOff(Sleeper sleeper, BackOff backoff)
throws InterruptedException {
try {
return BackOffUtils.next(sleeper, backoff);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
/** Returns a {@link TableDestination} object for the destination. May not return null. */
@Override
public TableDestination getTable(DestinationT destination) {
TableDestination wrappedDestination = super.getTable(destination);
Table existingTable = getBigQueryTable(wrappedDestination.getTableReference());
if (existingTable == null) {
return wrappedDestination;
} else {
return new TableDestination(
wrappedDestination.getTableSpec(),
existingTable.getDescription(),
existingTable.getTimePartitioning());
}
}
/** Returns the table schema for the destination. May not return null. */
@Override
public TableSchema getSchema(DestinationT destination) {
TableDestination wrappedDestination = super.getTable(destination);
Table existingTable = getBigQueryTable(wrappedDestination.getTableReference());
if (existingTable == null) {
return super.getSchema(destination);
} else {
return existingTable.getSchema();
}
}
}
}