| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigtable; |
| |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkState; |
| |
| import com.google.auto.value.AutoValue; |
| import com.google.bigtable.v2.Mutation; |
| import com.google.bigtable.v2.Row; |
| import com.google.bigtable.v2.RowFilter; |
| import com.google.bigtable.v2.SampleRowKeysResponse; |
| import com.google.cloud.bigtable.config.BigtableOptions; |
| import com.google.protobuf.ByteString; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NoSuchElementException; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import javax.annotation.Nullable; |
| import org.apache.beam.sdk.PipelineRunner; |
| import org.apache.beam.sdk.annotations.Experimental; |
| import org.apache.beam.sdk.coders.Coder; |
| import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.BoundedSource.BoundedReader; |
| import org.apache.beam.sdk.io.range.ByteKey; |
| import org.apache.beam.sdk.io.range.ByteKeyRange; |
| import org.apache.beam.sdk.io.range.ByteKeyRangeTracker; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PBegin; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.PDone; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.ToStringHelper; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * {@link PTransform Transforms} for reading from and writing to Google Cloud Bigtable. |
| * |
| * <p>For more information about Cloud Bigtable, see the online documentation at <a |
| * href="https://cloud.google.com/bigtable/">Google Cloud Bigtable</a>. |
| * |
| * <h3>Reading from Cloud Bigtable</h3> |
| * |
| * <p>The Bigtable source returns a set of rows from a single table, returning a {@code |
| * PCollection<Row>}. |
| * |
| * <p>To configure a Cloud Bigtable source, you must supply a table id, a project id, an instance id |
| * and optionally a {@link BigtableOptions} to provide more specific connection configuration. By |
| * default, {@link BigtableIO.Read} will read all rows in the table. The row ranges to be read can |
| * optionally be restricted using {@link BigtableIO.Read#withKeyRanges}, and a {@link RowFilter} can |
| * be specified using {@link BigtableIO.Read#withRowFilter}. For example: |
| * |
| * <pre>{@code |
| * Pipeline p = ...; |
| * |
| * // Scan the entire table. |
| * p.apply("read", |
| * BigtableIO.read() |
| * .withProjectId(projectId) |
| * .withInstanceId(instanceId) |
| * .withTableId("table")); |
| * |
| * // Scan a prefix of the table. |
| * ByteKeyRange keyRange = ...; |
| * p.apply("read", |
| * BigtableIO.read() |
| * .withProjectId(projectId) |
| * .withInstanceId(instanceId) |
| * .withTableId("table") |
| * .withKeyRange(keyRange)); |
| * |
| * // Scan a subset of rows that match the specified row filter. |
| * p.apply("filtered read", |
| * BigtableIO.read() |
| * .withProjectId(projectId) |
| * .withInstanceId(instanceId) |
| * .withTableId("table") |
| * .withRowFilter(filter)); |
| * }</pre> |
| * |
| * <h3>Writing to Cloud Bigtable</h3> |
| * |
| * <p>The Bigtable sink executes a set of row mutations on a single table. It takes as input a |
| * {@link PCollection PCollection<KV<ByteString, Iterable<Mutation>>>}, where the |
| * {@link ByteString} is the key of the row being mutated, and each {@link Mutation} represents an |
| * idempotent transformation to that row. |
| * |
| * <p>To configure a Cloud Bigtable sink, you must supply a table id, a project id, an instance id |
| * and optionally a configuration function for {@link BigtableOptions} to provide more specific |
| * connection configuration, for example: |
| * |
| * <pre>{@code |
| * PCollection<KV<ByteString, Iterable<Mutation>>> data = ...; |
| * |
| * data.apply("write", |
| * BigtableIO.write() |
| * .withProjectId("project") |
| * .withInstanceId("instance") |
| * .withTableId("table")); |
| * }</pre> |
| * |
| * <h3>Experimental</h3> |
| * |
| * <p>This connector for Cloud Bigtable is considered experimental and may break or receive |
| * backwards-incompatible changes in future versions of the Apache Beam SDK. Cloud Bigtable is in |
| * Beta, and thus it may introduce breaking changes in future revisions of its service or APIs. |
| * |
| * <h3>Permissions</h3> |
| * |
| * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the |
| * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner |
| * PipelineRunners} for more details. |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| public class BigtableIO { |
| private static final Logger LOG = LoggerFactory.getLogger(BigtableIO.class); |
| |
| /** |
| * Creates an uninitialized {@link BigtableIO.Read}. Before use, the {@code Read} must be |
| * initialized with a {@link BigtableIO.Read#withInstanceId} and {@link |
| * BigtableIO.Read#withProjectId} that specifies the source Cloud Bigtable instance, and a {@link |
| * BigtableIO.Read#withTableId} that specifies which table to read. A {@link RowFilter} may also |
| * optionally be specified using {@link BigtableIO.Read#withRowFilter(RowFilter)}. |
| */ |
| @Experimental |
| public static Read read() { |
| return Read.create(); |
| } |
| |
| /** |
| * Creates an uninitialized {@link BigtableIO.Write}. Before use, the {@code Write} must be |
| * initialized with a {@link BigtableIO.Write#withProjectId} and {@link |
| * BigtableIO.Write#withInstanceId} that specifies the destination Cloud Bigtable instance, and a |
| * {@link BigtableIO.Write#withTableId} that specifies which table to write. |
| */ |
| @Experimental |
| public static Write write() { |
| return Write.create(); |
| } |
| |
| /** |
| * A {@link PTransform} that reads from Google Cloud Bigtable. See the class-level Javadoc on |
| * {@link BigtableIO} for more information. |
| * |
| * @see BigtableIO |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| @AutoValue |
| public abstract static class Read extends PTransform<PBegin, PCollection<Row>> { |
| |
| abstract BigtableConfig getBigtableConfig(); |
| |
| @Nullable |
| abstract RowFilter getRowFilter(); |
| |
| /** Returns the range of keys that will be read from the table. */ |
| @Nullable |
| public abstract List<ByteKeyRange> getKeyRanges(); |
| |
| /** Returns the table being read from. */ |
| @Nullable |
| public String getTableId() { |
| ValueProvider<String> tableId = getBigtableConfig().getTableId(); |
| return tableId != null && tableId.isAccessible() ? tableId.get() : null; |
| } |
| |
| /** |
| * Returns the Google Cloud Bigtable instance being read from, and other parameters. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| @Nullable |
| public BigtableOptions getBigtableOptions() { |
| return getBigtableConfig().getBigtableOptions(); |
| } |
| |
| abstract Builder toBuilder(); |
| |
| static Read create() { |
| BigtableConfig config = |
| BigtableConfig.builder() |
| .setTableId(ValueProvider.StaticValueProvider.of("")) |
| .setValidate(true) |
| .build(); |
| |
| return new AutoValue_BigtableIO_Read.Builder() |
| .setBigtableConfig(config) |
| .setKeyRanges(Arrays.asList(ByteKeyRange.ALL_KEYS)) |
| .build(); |
| } |
| |
| @AutoValue.Builder |
| abstract static class Builder { |
| |
| abstract Builder setBigtableConfig(BigtableConfig bigtableConfig); |
| |
| abstract Builder setRowFilter(RowFilter filter); |
| |
| abstract Builder setKeyRanges(List<ByteKeyRange> keyRange); |
| |
| abstract Read build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable project |
| * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the |
| * instance. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withProjectId(ValueProvider<String> projectId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withProjectId(projectId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable project |
| * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the |
| * instance. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withProjectId(String projectId) { |
| return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance |
| * indicated by given parameter, requires {@link #withProjectId} to be called to determine the |
| * project. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withInstanceId(ValueProvider<String> instanceId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance |
| * indicated by given parameter, requires {@link #withProjectId} to be called to determine the |
| * project. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withInstanceId(String instanceId) { |
| return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the specified table. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withTableId(ValueProvider<String> tableId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withTableId(tableId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the specified table. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withTableId(String tableId) { |
| return withTableId(ValueProvider.StaticValueProvider.of(tableId)); |
| } |
| |
| /** |
| * WARNING: Should be used only to specify additional parameters for connection to the Cloud |
| * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link |
| * #withProjectId} respectively. |
| * |
| * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance |
| * indicated by {@link #withProjectId}, and using any other specified customizations. |
| * |
| * <p>Does not modify this object. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| public Read withBigtableOptions(BigtableOptions options) { |
| checkArgument(options != null, "options can not be null"); |
| return withBigtableOptions(options.toBuilder()); |
| } |
| |
| /** |
| * WARNING: Should be used only to specify additional parameters for connection to the Cloud |
| * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link |
| * #withProjectId} respectively. |
| * |
| * <p>Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance |
| * indicated by the given options, and using any other specified customizations. |
| * |
| * <p>Clones the given {@link BigtableOptions} builder so that any further changes will have no |
| * effect on the returned {@link BigtableIO.Read}. |
| * |
| * <p>Does not modify this object. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| public Read withBigtableOptions(BigtableOptions.Builder optionsBuilder) { |
| BigtableConfig config = getBigtableConfig(); |
| // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. |
| return toBuilder() |
| .setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build())) |
| .build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read from the Cloud Bigtable instance with |
| * customized options provided by given configurator. |
| * |
| * <p>WARNING: instanceId and projectId should not be provided here and should be provided over |
| * {@link #withProjectId} and {@link #withInstanceId}. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withBigtableOptionsConfigurator( |
| SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder() |
| .setBigtableConfig(config.withBigtableOptionsConfigurator(configurator)) |
| .build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will filter the rows read from Cloud Bigtable |
| * using the given row filter. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withRowFilter(RowFilter filter) { |
| checkArgument(filter != null, "filter can not be null"); |
| return toBuilder().setRowFilter(filter).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read only rows in the specified range. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withKeyRange(ByteKeyRange keyRange) { |
| checkArgument(keyRange != null, "keyRange can not be null"); |
| return toBuilder().setKeyRanges(Arrays.asList(keyRange)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read only rows in the specified ranges. |
| * Ranges must not overlap. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Read withKeyRanges(List<ByteKeyRange> keyRanges) { |
| checkArgument(keyRanges != null, "keyRanges can not be null"); |
| checkArgument(!keyRanges.isEmpty(), "keyRanges can not be empty"); |
| for (ByteKeyRange range : keyRanges) { |
| checkArgument(range != null, "keyRanges cannot hold null range"); |
| } |
| return toBuilder().setKeyRanges(keyRanges).build(); |
| } |
| |
| /** Disables validation that the table being read from exists. */ |
| public Read withoutValidation() { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withValidate(false)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Read} that will read using the given Cloud Bigtable service |
| * implementation. |
| * |
| * <p>This is used for testing. |
| * |
| * <p>Does not modify this object. |
| */ |
| @VisibleForTesting |
| Read withBigtableService(BigtableService bigtableService) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build(); |
| } |
| |
| @Override |
| public PCollection<Row> expand(PBegin input) { |
| getBigtableConfig().validate(); |
| |
| BigtableSource source = |
| new BigtableSource(getBigtableConfig(), getRowFilter(), getKeyRanges(), null); |
| return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source)); |
| } |
| |
| @Override |
| public void validate(PipelineOptions options) { |
| validateTableExists(getBigtableConfig(), options); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| getBigtableConfig().populateDisplayData(builder); |
| |
| List<ByteKeyRange> keyRanges = getKeyRanges(); |
| for (int i = 0; i < keyRanges.size() && i < 5; i++) { |
| builder.addIfNotDefault( |
| DisplayData.item("keyRange " + i, keyRanges.get(i).toString()), |
| ByteKeyRange.ALL_KEYS.toString()); |
| } |
| |
| if (getRowFilter() != null) { |
| builder.add( |
| DisplayData.item("rowFilter", getRowFilter().toString()).withLabel("Table Row Filter")); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| ToStringHelper helper = |
| MoreObjects.toStringHelper(Read.class).add("config", getBigtableConfig()); |
| for (int i = 0; i < getKeyRanges().size(); i++) { |
| helper.add("keyRange " + i, getKeyRanges().get(i)); |
| } |
| return helper.add("filter", getRowFilter()).toString(); |
| } |
| } |
| |
| /** |
| * A {@link PTransform} that writes to Google Cloud Bigtable. See the class-level Javadoc on |
| * {@link BigtableIO} for more information. |
| * |
| * @see BigtableIO |
| */ |
| @Experimental(Experimental.Kind.SOURCE_SINK) |
| @AutoValue |
| public abstract static class Write |
| extends PTransform<PCollection<KV<ByteString, Iterable<Mutation>>>, PDone> { |
| |
| static SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> |
| enableBulkApiConfigurator( |
| final @Nullable SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> |
| userConfigurator) { |
| return optionsBuilder -> { |
| if (userConfigurator != null) { |
| optionsBuilder = userConfigurator.apply(optionsBuilder); |
| } |
| |
| return optionsBuilder.setBulkOptions( |
| optionsBuilder.build().getBulkOptions().toBuilder().setUseBulkApi(true).build()); |
| }; |
| } |
| |
| abstract BigtableConfig getBigtableConfig(); |
| |
| /** |
| * Returns the Google Cloud Bigtable instance being written to, and other parameters. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| @Nullable |
| public BigtableOptions getBigtableOptions() { |
| return getBigtableConfig().getBigtableOptions(); |
| } |
| |
| abstract Builder toBuilder(); |
| |
| static Write create() { |
| BigtableConfig config = |
| BigtableConfig.builder() |
| .setTableId(ValueProvider.StaticValueProvider.of("")) |
| .setValidate(true) |
| .setBigtableOptionsConfigurator(enableBulkApiConfigurator(null)) |
| .build(); |
| |
| return new AutoValue_BigtableIO_Write.Builder().setBigtableConfig(config).build(); |
| } |
| |
| @AutoValue.Builder |
| abstract static class Builder { |
| |
| abstract Builder setBigtableConfig(BigtableConfig bigtableConfig); |
| |
| abstract Write build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable project |
| * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the |
| * instance. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withProjectId(ValueProvider<String> projectId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withProjectId(projectId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable project |
| * indicated by given parameter, requires {@link #withInstanceId} to be called to determine the |
| * instance. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withProjectId(String projectId) { |
| return withProjectId(ValueProvider.StaticValueProvider.of(projectId)); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable instance |
| * indicated by given parameter, requires {@link #withProjectId} to be called to determine the |
| * project. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withInstanceId(ValueProvider<String> instanceId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withInstanceId(instanceId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write into the Cloud Bigtable instance |
| * indicated by given parameter, requires {@link #withProjectId} to be called to determine the |
| * project. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withInstanceId(String instanceId) { |
| return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId)); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write to the specified table. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withTableId(ValueProvider<String> tableId) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withTableId(tableId)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write to the specified table. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withTableId(String tableId) { |
| return withTableId(ValueProvider.StaticValueProvider.of(tableId)); |
| } |
| |
| /** |
| * WARNING: Should be used only to specify additional parameters for connection to the Cloud |
| * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link |
| * #withProjectId} respectively. |
| * |
| * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance |
| * indicated by the given options, and using any other specified customizations. |
| * |
| * <p>Does not modify this object. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| public Write withBigtableOptions(BigtableOptions options) { |
| checkArgument(options != null, "options can not be null"); |
| return withBigtableOptions(options.toBuilder()); |
| } |
| |
| /** |
| * WARNING: Should be used only to specify additional parameters for connection to the Cloud |
| * Bigtable, instanceId and projectId should be provided over {@link #withInstanceId} and {@link |
| * #withProjectId} respectively. |
| * |
| * <p>Returns a new {@link BigtableIO.Write} that will write to the Cloud Bigtable instance |
| * indicated by the given options, and using any other specified customizations. |
| * |
| * <p>Clones the given {@link BigtableOptions} builder so that any further changes will have no |
| * effect on the returned {@link BigtableIO.Write}. |
| * |
| * <p>Does not modify this object. |
| * |
| * @deprecated will be replaced by bigtable options configurator. |
| */ |
| @Deprecated |
| public Write withBigtableOptions(BigtableOptions.Builder optionsBuilder) { |
| BigtableConfig config = getBigtableConfig(); |
| // TODO: is there a better way to clone a Builder? Want it to be immune from user changes. |
| return toBuilder() |
| .setBigtableConfig(config.withBigtableOptions(optionsBuilder.build().toBuilder().build())) |
| .build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will read from the Cloud Bigtable instance with |
| * customized options provided by given configurator. |
| * |
| * <p>WARNING: instanceId and projectId should not be provided here and should be provided over |
| * {@link #withProjectId} and {@link #withInstanceId}. |
| * |
| * <p>Does not modify this object. |
| */ |
| public Write withBigtableOptionsConfigurator( |
| SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder> configurator) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder() |
| .setBigtableConfig( |
| config.withBigtableOptionsConfigurator(enableBulkApiConfigurator(configurator))) |
| .build(); |
| } |
| |
| /** Disables validation that the table being written to exists. */ |
| public Write withoutValidation() { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withValidate(false)).build(); |
| } |
| |
| /** |
| * Returns a new {@link BigtableIO.Write} that will write using the given Cloud Bigtable service |
| * implementation. |
| * |
| * <p>This is used for testing. |
| * |
| * <p>Does not modify this object. |
| */ |
| Write withBigtableService(BigtableService bigtableService) { |
| BigtableConfig config = getBigtableConfig(); |
| return toBuilder().setBigtableConfig(config.withBigtableService(bigtableService)).build(); |
| } |
| |
| @Override |
| public PDone expand(PCollection<KV<ByteString, Iterable<Mutation>>> input) { |
| getBigtableConfig().validate(); |
| |
| input.apply(ParDo.of(new BigtableWriterFn(getBigtableConfig()))); |
| return PDone.in(input.getPipeline()); |
| } |
| |
| @Override |
| public void validate(PipelineOptions options) { |
| validateTableExists(getBigtableConfig(), options); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| getBigtableConfig().populateDisplayData(builder); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(Write.class).add("config", getBigtableConfig()).toString(); |
| } |
| |
| private class BigtableWriterFn extends DoFn<KV<ByteString, Iterable<Mutation>>, Void> { |
| |
| public BigtableWriterFn(BigtableConfig bigtableConfig) { |
| this.config = bigtableConfig; |
| this.failures = new ConcurrentLinkedQueue<>(); |
| } |
| |
| @StartBundle |
| public void startBundle(StartBundleContext c) throws IOException { |
| if (bigtableWriter == null) { |
| bigtableWriter = |
| config |
| .getBigtableService(c.getPipelineOptions()) |
| .openForWriting(config.getTableId().get()); |
| } |
| recordsWritten = 0; |
| } |
| |
| @ProcessElement |
| public void processElement(ProcessContext c) throws Exception { |
| checkForFailures(); |
| bigtableWriter |
| .writeRecord(c.element()) |
| .whenComplete( |
| (mutationResult, exception) -> { |
| if (exception != null) { |
| failures.add(new BigtableWriteException(c.element(), exception)); |
| } |
| }); |
| ++recordsWritten; |
| } |
| |
| @FinishBundle |
| public void finishBundle() throws Exception { |
| bigtableWriter.flush(); |
| checkForFailures(); |
| LOG.debug("Wrote {} records", recordsWritten); |
| } |
| |
| @Teardown |
| public void tearDown() throws Exception { |
| if (bigtableWriter != null) { |
| bigtableWriter.close(); |
| bigtableWriter = null; |
| } |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| builder.delegate(Write.this); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| private final BigtableConfig config; |
| private BigtableService.Writer bigtableWriter; |
| private long recordsWritten; |
| private final ConcurrentLinkedQueue<BigtableWriteException> failures; |
| |
| /** If any write has asynchronously failed, fail the bundle with a useful error. */ |
| private void checkForFailures() throws IOException { |
| // Note that this function is never called by multiple threads and is the only place that |
| // we remove from failures, so this code is safe. |
| if (failures.isEmpty()) { |
| return; |
| } |
| |
| StringBuilder logEntry = new StringBuilder(); |
| int i = 0; |
| List<BigtableWriteException> suppressed = Lists.newArrayList(); |
| for (; i < 10 && !failures.isEmpty(); ++i) { |
| BigtableWriteException exc = failures.remove(); |
| logEntry.append("\n").append(exc.getMessage()); |
| if (exc.getCause() != null) { |
| logEntry.append(": ").append(exc.getCause().getMessage()); |
| } |
| suppressed.add(exc); |
| } |
| String message = |
| String.format( |
| "At least %d errors occurred writing to Bigtable. First %d errors: %s", |
| i + failures.size(), i, logEntry.toString()); |
| LOG.error(message); |
| IOException exception = new IOException(message); |
| for (BigtableWriteException e : suppressed) { |
| exception.addSuppressed(e); |
| } |
| throw exception; |
| } |
| } |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////////////////// |
| /** Disallow construction of utility class. */ |
| private BigtableIO() {} |
| |
| private static ByteKey makeByteKey(ByteString key) { |
| return ByteKey.copyFrom(key.asReadOnlyByteBuffer()); |
| } |
| |
| static class BigtableSource extends BoundedSource<Row> { |
| public BigtableSource( |
| BigtableConfig config, |
| @Nullable RowFilter filter, |
| List<ByteKeyRange> ranges, |
| @Nullable Long estimatedSizeBytes) { |
| this.config = config; |
| this.filter = filter; |
| this.ranges = ranges; |
| this.estimatedSizeBytes = estimatedSizeBytes; |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(BigtableSource.class) |
| .add("config", config) |
| .add("filter", filter) |
| .add("ranges", ranges) |
| .add("estimatedSizeBytes", estimatedSizeBytes) |
| .toString(); |
| } |
| |
| ////// Private state and internal implementation details ////// |
| private final BigtableConfig config; |
| @Nullable private final RowFilter filter; |
| private final List<ByteKeyRange> ranges; |
| @Nullable private Long estimatedSizeBytes; |
| @Nullable private transient List<SampleRowKeysResponse> sampleRowKeys; |
| |
| /** Creates a new {@link BigtableSource} with just one {@link ByteKeyRange}. */ |
| protected BigtableSource withSingleRange(ByteKeyRange range) { |
| checkArgument(range != null, "range can not be null"); |
| return new BigtableSource(config, filter, Arrays.asList(range), estimatedSizeBytes); |
| } |
| |
| protected BigtableSource withEstimatedSizeBytes(Long estimatedSizeBytes) { |
| checkArgument(estimatedSizeBytes != null, "estimatedSizeBytes can not be null"); |
| return new BigtableSource(config, filter, ranges, estimatedSizeBytes); |
| } |
| |
| /** |
| * Makes an API call to the Cloud Bigtable service that gives information about tablet key |
| * boundaries and estimated sizes. We can use these samples to ensure that splits are on |
| * different tablets, and possibly generate sub-splits within tablets. |
| */ |
| private List<SampleRowKeysResponse> getSampleRowKeys(PipelineOptions pipelineOptions) |
| throws IOException { |
| return config.getBigtableService(pipelineOptions).getSampleRowKeys(this); |
| } |
| |
| private static final long MAX_SPLIT_COUNT = 15_360L; |
| |
| @Override |
| public List<BigtableSource> split(long desiredBundleSizeBytes, PipelineOptions options) |
| throws Exception { |
| // Update the desiredBundleSizeBytes in order to limit the |
| // number of splits to maximumNumberOfSplits. |
| long maximumNumberOfSplits = 4000; |
| long sizeEstimate = getEstimatedSizeBytes(options); |
| desiredBundleSizeBytes = |
| Math.max(sizeEstimate / maximumNumberOfSplits, desiredBundleSizeBytes); |
| |
| // Delegate to testable helper. |
| List<BigtableSource> splits = |
| splitBasedOnSamples(desiredBundleSizeBytes, getSampleRowKeys(options)); |
| |
| // Reduce the splits. |
| List<BigtableSource> reduced = reduceSplits(splits, options, MAX_SPLIT_COUNT); |
| // Randomize the result before returning an immutable copy of the splits, the default behavior |
| // may lead to multiple workers hitting the same tablet. |
| Collections.shuffle(reduced); |
| return ImmutableList.copyOf(reduced); |
| } |
| |
| /** Returns a mutable list of reduced splits. */ |
| @VisibleForTesting |
| protected List<BigtableSource> reduceSplits( |
| List<BigtableSource> splits, PipelineOptions options, long maxSplitCounts) |
| throws IOException { |
| int numberToCombine = (int) ((splits.size() + maxSplitCounts - 1) / maxSplitCounts); |
| if (splits.size() < maxSplitCounts || numberToCombine < 2) { |
| return new ArrayList<>(splits); |
| } |
| List<BigtableSource> reducedSplits = new ArrayList<>(); |
| List<ByteKeyRange> previousSourceRanges = new ArrayList<>(); |
| int counter = 0; |
| long size = 0; |
| for (BigtableSource source : splits) { |
| if (counter == numberToCombine |
| || !checkRangeAdjacency(previousSourceRanges, source.getRanges())) { |
| reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size)); |
| counter = 0; |
| size = 0; |
| previousSourceRanges = new ArrayList<>(); |
| } |
| previousSourceRanges.addAll(source.getRanges()); |
| previousSourceRanges = mergeRanges(previousSourceRanges); |
| size += source.getEstimatedSizeBytes(options); |
| counter++; |
| } |
| if (size > 0) { |
| reducedSplits.add(new BigtableSource(config, filter, previousSourceRanges, size)); |
| } |
| return reducedSplits; |
| } |
| |
| /** |
| * Helper to validate range Adjacency. Ranges are considered adjacent if |
| * [1..100][100..200][200..300] |
| */ |
| private static boolean checkRangeAdjacency( |
| List<ByteKeyRange> ranges, List<ByteKeyRange> otherRanges) { |
| checkArgument(ranges != null || otherRanges != null, "Both ranges cannot be null."); |
| ImmutableList.Builder<ByteKeyRange> mergedRanges = ImmutableList.builder(); |
| if (ranges != null) { |
| mergedRanges.addAll(ranges); |
| } |
| if (otherRanges != null) { |
| mergedRanges.addAll(otherRanges); |
| } |
| return checkRangeAdjacency(mergedRanges.build()); |
| } |
| |
| /** |
| * Helper to validate range Adjacency. Ranges are considered adjacent if |
| * [1..100][100..200][200..300] |
| */ |
| private static boolean checkRangeAdjacency(List<ByteKeyRange> ranges) { |
| int index = 0; |
| if (ranges.size() < 2) { |
| return true; |
| } |
| ByteKey lastEndKey = ranges.get(index++).getEndKey(); |
| while (index < ranges.size()) { |
| ByteKeyRange currentKeyRange = ranges.get(index++); |
| if (!lastEndKey.equals(currentKeyRange.getStartKey())) { |
| return false; |
| } |
| lastEndKey = currentKeyRange.getEndKey(); |
| } |
| return true; |
| } |
| |
| /** |
| * Helper to combine/merge ByteKeyRange Ranges should only be merged if they are adjacent ex. |
| * [1..100][100..200][200..300] will result in [1..300] Note: this method will not check for |
| * adjacency see {@link #checkRangeAdjacency(List)} |
| */ |
| private static List<ByteKeyRange> mergeRanges(List<ByteKeyRange> ranges) { |
| List<ByteKeyRange> response = new ArrayList<>(); |
| if (ranges.size() < 2) { |
| response.add(ranges.get(0)); |
| } else { |
| response.add( |
| ByteKeyRange.of( |
| ranges.get(0).getStartKey(), ranges.get(ranges.size() - 1).getEndKey())); |
| } |
| return response; |
| } |
| |
| /** Helper that splits this source into bundles based on Cloud Bigtable sampled row keys. */ |
| private List<BigtableSource> splitBasedOnSamples( |
| long desiredBundleSizeBytes, List<SampleRowKeysResponse> sampleRowKeys) { |
| // There are no regions, or no samples available. Just scan the entire range. |
| if (sampleRowKeys.isEmpty()) { |
| LOG.info("Not splitting source {} because no sample row keys are available.", this); |
| return Collections.singletonList(this); |
| } |
| LOG.info( |
| "About to split into bundles of size {} with sampleRowKeys length {} first element {}", |
| desiredBundleSizeBytes, |
| sampleRowKeys.size(), |
| sampleRowKeys.get(0)); |
| |
| ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); |
| for (ByteKeyRange range : ranges) { |
| splits.addAll(splitRangeBasedOnSamples(desiredBundleSizeBytes, sampleRowKeys, range)); |
| } |
| return splits.build(); |
| } |
| |
| /** |
| * Helper that splits a {@code ByteKeyRange} into bundles based on Cloud Bigtable sampled row |
| * keys. |
| */ |
| private List<BigtableSource> splitRangeBasedOnSamples( |
| long desiredBundleSizeBytes, |
| List<SampleRowKeysResponse> sampleRowKeys, |
| ByteKeyRange range) { |
| |
| // Loop through all sampled responses and generate splits from the ones that overlap the |
| // scan range. The main complication is that we must track the end range of the previous |
| // sample to generate good ranges. |
| ByteKey lastEndKey = ByteKey.EMPTY; |
| long lastOffset = 0; |
| ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); |
| for (SampleRowKeysResponse response : sampleRowKeys) { |
| ByteKey responseEndKey = makeByteKey(response.getRowKey()); |
| long responseOffset = response.getOffsetBytes(); |
| checkState( |
| responseOffset >= lastOffset, |
| "Expected response byte offset %s to come after the last offset %s", |
| responseOffset, |
| lastOffset); |
| |
| if (!range.overlaps(ByteKeyRange.of(lastEndKey, responseEndKey))) { |
| // This region does not overlap the scan, so skip it. |
| lastOffset = responseOffset; |
| lastEndKey = responseEndKey; |
| continue; |
| } |
| |
| // Calculate the beginning of the split as the larger of startKey and the end of the last |
| // split. Unspecified start is smallest key so is correctly treated as earliest key. |
| ByteKey splitStartKey = lastEndKey; |
| if (splitStartKey.compareTo(range.getStartKey()) < 0) { |
| splitStartKey = range.getStartKey(); |
| } |
| |
| // Calculate the end of the split as the smaller of endKey and the end of this sample. Note |
| // that range.containsKey handles the case when range.getEndKey() is empty. |
| ByteKey splitEndKey = responseEndKey; |
| if (!range.containsKey(splitEndKey)) { |
| splitEndKey = range.getEndKey(); |
| } |
| |
| // We know this region overlaps the desired key range, and we know a rough estimate of its |
| // size. Split the key range into bundle-sized chunks and then add them all as splits. |
| long sampleSizeBytes = responseOffset - lastOffset; |
| List<BigtableSource> subSplits = |
| splitKeyRangeIntoBundleSizedSubranges( |
| sampleSizeBytes, |
| desiredBundleSizeBytes, |
| ByteKeyRange.of(splitStartKey, splitEndKey)); |
| splits.addAll(subSplits); |
| |
| // Move to the next region. |
| lastEndKey = responseEndKey; |
| lastOffset = responseOffset; |
| } |
| |
| // We must add one more region after the end of the samples if both these conditions hold: |
| // 1. we did not scan to the end yet (lastEndKey is concrete, not 0-length). |
| // 2. we want to scan to the end (endKey is empty) or farther (lastEndKey < endKey). |
| if (!lastEndKey.isEmpty() |
| && (range.getEndKey().isEmpty() || lastEndKey.compareTo(range.getEndKey()) < 0)) { |
| splits.add(this.withSingleRange(ByteKeyRange.of(lastEndKey, range.getEndKey()))); |
| } |
| |
| List<BigtableSource> ret = splits.build(); |
| LOG.info("Generated {} splits. First split: {}", ret.size(), ret.get(0)); |
| return ret; |
| } |
| |
| @Override |
| public long getEstimatedSizeBytes(PipelineOptions options) throws IOException { |
| // Delegate to testable helper. |
| if (estimatedSizeBytes == null) { |
| estimatedSizeBytes = getEstimatedSizeBytesBasedOnSamples(getSampleRowKeys(options)); |
| } |
| return estimatedSizeBytes; |
| } |
| |
| /** |
| * Computes the estimated size in bytes based on the total size of all samples that overlap the |
| * key ranges this source will scan. |
| */ |
| private long getEstimatedSizeBytesBasedOnSamples(List<SampleRowKeysResponse> samples) { |
| long estimatedSizeBytes = 0; |
| long lastOffset = 0; |
| ByteKey currentStartKey = ByteKey.EMPTY; |
| // Compute the total estimated size as the size of each sample that overlaps the scan range. |
| // TODO: In future, Bigtable service may provide finer grained APIs, e.g., to sample given a |
| // filter or to sample on a given key range. |
| for (SampleRowKeysResponse response : samples) { |
| ByteKey currentEndKey = makeByteKey(response.getRowKey()); |
| long currentOffset = response.getOffsetBytes(); |
| if (!currentStartKey.isEmpty() && currentStartKey.equals(currentEndKey)) { |
| // Skip an empty region. |
| lastOffset = currentOffset; |
| continue; |
| } else { |
| for (ByteKeyRange range : ranges) { |
| if (range.overlaps(ByteKeyRange.of(currentStartKey, currentEndKey))) { |
| estimatedSizeBytes += currentOffset - lastOffset; |
| // We don't want to double our estimated size if two ranges overlap this sample |
| // region, so exit early. |
| break; |
| } |
| } |
| } |
| currentStartKey = currentEndKey; |
| lastOffset = currentOffset; |
| } |
| return estimatedSizeBytes; |
| } |
| |
| @Override |
| public BoundedReader<Row> createReader(PipelineOptions options) throws IOException { |
| return new BigtableReader(this, config.getBigtableService(options)); |
| } |
| |
| @Override |
| public void validate() { |
| if (!config.getValidate()) { |
| LOG.debug("Validation is disabled"); |
| return; |
| } |
| |
| ValueProvider<String> tableId = config.getTableId(); |
| checkArgument( |
| tableId != null && tableId.isAccessible() && !tableId.get().isEmpty(), |
| "tableId was not supplied"); |
| } |
| |
| @Override |
| public void populateDisplayData(DisplayData.Builder builder) { |
| super.populateDisplayData(builder); |
| |
| builder.add(DisplayData.item("tableId", config.getTableId()).withLabel("Table ID")); |
| |
| if (filter != null) { |
| builder.add(DisplayData.item("rowFilter", filter.toString()).withLabel("Table Row Filter")); |
| } |
| } |
| |
| @Override |
| public Coder<Row> getOutputCoder() { |
| return ProtoCoder.of(Row.class); |
| } |
| |
| /** Helper that splits the specified range in this source into bundles. */ |
| private List<BigtableSource> splitKeyRangeIntoBundleSizedSubranges( |
| long sampleSizeBytes, long desiredBundleSizeBytes, ByteKeyRange range) { |
| // Catch the trivial cases. Split is small enough already, or this is the last region. |
| LOG.debug( |
| "Subsplit for sampleSizeBytes {} and desiredBundleSizeBytes {}", |
| sampleSizeBytes, |
| desiredBundleSizeBytes); |
| if (sampleSizeBytes <= desiredBundleSizeBytes) { |
| return Collections.singletonList( |
| this.withSingleRange(ByteKeyRange.of(range.getStartKey(), range.getEndKey()))); |
| } |
| |
| checkArgument( |
| sampleSizeBytes > 0, "Sample size %s bytes must be greater than 0.", sampleSizeBytes); |
| checkArgument( |
| desiredBundleSizeBytes > 0, |
| "Desired bundle size %s bytes must be greater than 0.", |
| desiredBundleSizeBytes); |
| |
| int splitCount = (int) Math.ceil(((double) sampleSizeBytes) / desiredBundleSizeBytes); |
| List<ByteKey> splitKeys = range.split(splitCount); |
| ImmutableList.Builder<BigtableSource> splits = ImmutableList.builder(); |
| Iterator<ByteKey> keys = splitKeys.iterator(); |
| ByteKey prev = keys.next(); |
| while (keys.hasNext()) { |
| ByteKey next = keys.next(); |
| splits.add( |
| this.withSingleRange(ByteKeyRange.of(prev, next)) |
| .withEstimatedSizeBytes(sampleSizeBytes / splitCount)); |
| prev = next; |
| } |
| return splits.build(); |
| } |
| |
| public List<ByteKeyRange> getRanges() { |
| return ranges; |
| } |
| |
| public RowFilter getRowFilter() { |
| return filter; |
| } |
| |
| public ValueProvider<String> getTableId() { |
| return config.getTableId(); |
| } |
| } |
| |
| private static class BigtableReader extends BoundedReader<Row> { |
| // Thread-safety: source is protected via synchronization and is only accessed or modified |
| // inside a synchronized block (or constructor, which is the same). |
| private BigtableSource source; |
| private BigtableService service; |
| private BigtableService.Reader reader; |
| private final ByteKeyRangeTracker rangeTracker; |
| private long recordsReturned; |
| |
| public BigtableReader(BigtableSource source, BigtableService service) { |
| checkArgument(source.getRanges().size() == 1, "source must have exactly one key range"); |
| this.source = source; |
| this.service = service; |
| rangeTracker = ByteKeyRangeTracker.of(source.getRanges().get(0)); |
| } |
| |
| @Override |
| public boolean start() throws IOException { |
| reader = service.createReader(getCurrentSource()); |
| boolean hasRecord = |
| (reader.start() |
| && rangeTracker.tryReturnRecordAt( |
| true, makeByteKey(reader.getCurrentRow().getKey()))) |
| || rangeTracker.markDone(); |
| if (hasRecord) { |
| ++recordsReturned; |
| } |
| return hasRecord; |
| } |
| |
| @Override |
| public synchronized BigtableSource getCurrentSource() { |
| return source; |
| } |
| |
| @Override |
| public boolean advance() throws IOException { |
| boolean hasRecord = |
| (reader.advance() |
| && rangeTracker.tryReturnRecordAt( |
| true, makeByteKey(reader.getCurrentRow().getKey()))) |
| || rangeTracker.markDone(); |
| if (hasRecord) { |
| ++recordsReturned; |
| } |
| return hasRecord; |
| } |
| |
| @Override |
| public Row getCurrent() throws NoSuchElementException { |
| return reader.getCurrentRow(); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| LOG.info("Closing reader after reading {} records.", recordsReturned); |
| if (reader != null) { |
| reader.close(); |
| reader = null; |
| } |
| } |
| |
| @Override |
| public final Double getFractionConsumed() { |
| return rangeTracker.getFractionConsumed(); |
| } |
| |
| @Override |
| public final long getSplitPointsConsumed() { |
| return rangeTracker.getSplitPointsConsumed(); |
| } |
| |
| @Override |
| @Nullable |
| public final synchronized BigtableSource splitAtFraction(double fraction) { |
| ByteKey splitKey; |
| ByteKeyRange range = rangeTracker.getRange(); |
| try { |
| splitKey = range.interpolateKey(fraction); |
| } catch (RuntimeException e) { |
| LOG.info("{}: Failed to interpolate key for fraction {}.", range, fraction, e); |
| return null; |
| } |
| LOG.info("Proposing to split {} at fraction {} (key {})", rangeTracker, fraction, splitKey); |
| BigtableSource primary; |
| BigtableSource residual; |
| try { |
| primary = source.withSingleRange(ByteKeyRange.of(range.getStartKey(), splitKey)); |
| residual = source.withSingleRange(ByteKeyRange.of(splitKey, range.getEndKey())); |
| } catch (RuntimeException e) { |
| LOG.info( |
| "{}: Interpolating for fraction {} yielded invalid split key {}.", |
| rangeTracker.getRange(), |
| fraction, |
| splitKey, |
| e); |
| return null; |
| } |
| if (!rangeTracker.trySplitAtPosition(splitKey)) { |
| return null; |
| } |
| this.source = primary; |
| return residual; |
| } |
| } |
| |
| /** An exception that puts information about the failed record being written in its message. */ |
| static class BigtableWriteException extends IOException { |
| public BigtableWriteException(KV<ByteString, Iterable<Mutation>> record, Throwable cause) { |
| super( |
| String.format( |
| "Error mutating row %s with mutations %s", |
| record.getKey().toStringUtf8(), record.getValue()), |
| cause); |
| } |
| } |
| |
| static void validateTableExists(BigtableConfig config, PipelineOptions options) { |
| if (config.getValidate() && config.isDataAccessible()) { |
| String tableId = checkNotNull(config.getTableId().get()); |
| try { |
| checkArgument( |
| config.getBigtableService(options).tableExists(tableId), |
| "Table %s does not exist", |
| tableId); |
| } catch (IOException e) { |
| LOG.warn("Error checking whether table {} exists; proceeding.", tableId, e); |
| } |
| } |
| } |
| } |