blob: e995e7af969e0deb91fa85c46f2419dcc23ea9f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigtable;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSourcesEqualReferenceSource;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionExhaustive;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.Verify.verifyNotNull;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.MutateRowResponse;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.bigtable.v2.Row;
import com.google.bigtable.v2.RowFilter;
import com.google.bigtable.v2.SampleRowKeysResponse;
import com.google.cloud.bigtable.config.BigtableOptions;
import com.google.cloud.bigtable.config.BulkOptions;
import com.google.cloud.bigtable.config.CredentialOptions;
import com.google.cloud.bigtable.config.CredentialOptions.CredentialType;
import com.google.cloud.bigtable.config.RetryOptions;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.IterableCoder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.gcp.auth.TestCredential;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO.BigtableSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipeline.PipelineRunMissingException;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsIterableContainingInAnyOrder;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Unit tests for {@link BigtableIO}. */
@RunWith(JUnit4.class)
public class BigtableIOTest {
@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public ExpectedException thrown = ExpectedException.none();
@Rule public ExpectedLogs logged = ExpectedLogs.none(BigtableIO.class);
/** Read Options for testing. */
public interface ReadOptions extends GcpOptions {
@Description("The project that contains the table to export.")
ValueProvider<String> getBigtableProject();
@SuppressWarnings("unused")
void setBigtableProject(ValueProvider<String> projectId);
@Description("The Bigtable instance id that contains the table to export.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@Description("The Bigtable table id to export.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
}
static final ValueProvider<String> NOT_ACCESSIBLE_VALUE =
new ValueProvider<String>() {
@Override
public String get() {
throw new IllegalStateException("Value is not accessible");
}
@Override
public boolean isAccessible() {
return false;
}
};
private static BigtableConfig config;
private static FakeBigtableService service;
private static final BigtableOptions BIGTABLE_OPTIONS =
new BigtableOptions.Builder()
.setProjectId("options_project")
.setInstanceId("options_instance")
.build();
private static BigtableIO.Read defaultRead =
BigtableIO.read().withInstanceId("instance").withProjectId("project");
private static BigtableIO.Write defaultWrite =
BigtableIO.write().withInstanceId("instance").withProjectId("project");
private Coder<KV<ByteString, Iterable<Mutation>>> bigtableCoder;
private static final TypeDescriptor<KV<ByteString, Iterable<Mutation>>> BIGTABLE_WRITE_TYPE =
new TypeDescriptor<KV<ByteString, Iterable<Mutation>>>() {};
private static final SerializableFunction<BigtableOptions.Builder, BigtableOptions.Builder>
PORT_CONFIGURATOR = input -> input.setPort(1234);
@Before
public void setup() throws Exception {
service = new FakeBigtableService();
defaultRead = defaultRead.withBigtableService(service);
defaultWrite = defaultWrite.withBigtableService(service);
bigtableCoder = p.getCoderRegistry().getCoder(BIGTABLE_WRITE_TYPE);
config = BigtableConfig.builder().setValidate(true).setBigtableService(service).build();
}
private static ByteKey makeByteKey(ByteString key) {
return ByteKey.copyFrom(key.asReadOnlyByteBuffer());
}
@Test
public void testReadBuildsCorrectly() {
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("table")
.withInstanceId("instance")
.withProjectId("project")
.withBigtableOptionsConfigurator(PORT_CONFIGURATOR);
assertEquals("options_project", read.getBigtableOptions().getProjectId());
assertEquals("options_instance", read.getBigtableOptions().getInstanceId());
assertEquals("instance", read.getBigtableConfig().getInstanceId().get());
assertEquals("project", read.getBigtableConfig().getProjectId().get());
assertEquals("table", read.getTableId());
assertEquals(PORT_CONFIGURATOR, read.getBigtableConfig().getBigtableOptionsConfigurator());
}
@Test
public void testReadValidationFailsMissingTable() {
BigtableIO.Read read = BigtableIO.read().withBigtableOptions(BIGTABLE_OPTIONS);
thrown.expect(IllegalArgumentException.class);
read.expand(null);
}
@Test
public void testReadValidationFailsMissingInstanceId() {
BigtableIO.Read read =
BigtableIO.read()
.withTableId("table")
.withProjectId("project")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
read.expand(null);
}
@Test
public void testReadValidationFailsMissingProjectId() {
BigtableIO.Read read =
BigtableIO.read()
.withTableId("table")
.withInstanceId("instance")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
read.expand(null);
}
@Test
public void testReadValidationFailsMissingInstanceIdAndProjectId() {
BigtableIO.Read read =
BigtableIO.read()
.withTableId("table")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
read.expand(null);
}
@Test
public void testReadWithRuntimeParametersValidationFailed() {
ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class);
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage("tableId was not supplied");
p.apply(read);
}
@Test
public void testReadWithRuntimeParametersValidationDisabled() {
ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class);
BigtableIO.Read read =
BigtableIO.read()
.withoutValidation()
.withProjectId(options.getBigtableProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Not running a pipeline therefore this is expected.
thrown.expect(PipelineRunMissingException.class);
p.apply(read);
}
@Test
public void testWriteBuildsCorrectly() {
BigtableIO.Write write =
BigtableIO.write()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("table")
.withInstanceId("instance")
.withProjectId("project");
assertEquals("table", write.getBigtableConfig().getTableId().get());
assertEquals("options_project", write.getBigtableOptions().getProjectId());
assertEquals("options_instance", write.getBigtableOptions().getInstanceId());
assertEquals("instance", write.getBigtableConfig().getInstanceId().get());
assertEquals("project", write.getBigtableConfig().getProjectId().get());
}
@Test
public void testWriteValidationFailsMissingInstanceId() {
BigtableIO.Write write =
BigtableIO.write()
.withTableId("table")
.withProjectId("project")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
write.expand(null);
}
@Test
public void testWriteValidationFailsMissingProjectId() {
BigtableIO.Write write =
BigtableIO.write()
.withTableId("table")
.withInstanceId("instance")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
write.expand(null);
}
@Test
public void testWriteValidationFailsMissingInstanceIdAndProjectId() {
BigtableIO.Write write =
BigtableIO.write()
.withTableId("table")
.withBigtableOptions(new BigtableOptions.Builder().build());
thrown.expect(IllegalArgumentException.class);
write.expand(null);
}
@Test
public void testWriteValidationFailsMissingOptionsAndInstanceAndProject() {
BigtableIO.Write write = BigtableIO.write().withTableId("table");
thrown.expect(IllegalArgumentException.class);
write.expand(null);
}
/** Helper function to make a single row mutation to be written. */
private static KV<ByteString, Iterable<Mutation>> makeWrite(String key, String value) {
ByteString rowKey = ByteString.copyFromUtf8(key);
Iterable<Mutation> mutations =
ImmutableList.of(
Mutation.newBuilder()
.setSetCell(SetCell.newBuilder().setValue(ByteString.copyFromUtf8(value)))
.build());
return KV.of(rowKey, mutations);
}
/** Helper function to make a single bad row mutation (no set cell). */
private static KV<ByteString, Iterable<Mutation>> makeBadWrite(String key) {
Iterable<Mutation> mutations = ImmutableList.of(Mutation.newBuilder().build());
return KV.of(ByteString.copyFromUtf8(key), mutations);
}
/** Tests that credentials are used from PipelineOptions if not supplied by BigtableOptions. */
@Test
public void testUsePipelineOptionsCredentialsIfNotSpecifiedInBigtableOptions() throws Exception {
BigtableOptions options =
BIGTABLE_OPTIONS
.toBuilder()
.setCredentialOptions(CredentialOptions.defaultCredentials())
.build();
GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
pipelineOptions.setGcpCredential(new TestCredential());
BigtableService readService =
BigtableIO.read()
.withBigtableOptions(options)
.withTableId("TEST-TABLE")
.getBigtableConfig()
.getBigtableService(pipelineOptions);
BigtableService writeService =
BigtableIO.write()
.withBigtableOptions(options)
.withTableId("TEST-TABLE")
.getBigtableConfig()
.getBigtableService(pipelineOptions);
assertEquals(
CredentialType.SuppliedCredentials,
readService.getBigtableOptions().getCredentialOptions().getCredentialType());
assertEquals(
CredentialType.SuppliedCredentials,
writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
}
/** Tests that credentials are not used from PipelineOptions if supplied by BigtableOptions. */
@Test
public void testDontUsePipelineOptionsCredentialsIfSpecifiedInBigtableOptions() throws Exception {
BigtableOptions options =
BIGTABLE_OPTIONS
.toBuilder()
.setCredentialOptions(CredentialOptions.nullCredential())
.build();
GcpOptions pipelineOptions = PipelineOptionsFactory.as(GcpOptions.class);
pipelineOptions.setGcpCredential(new TestCredential());
BigtableService readService =
BigtableIO.read()
.withBigtableOptions(options)
.withTableId("TEST-TABLE")
.getBigtableConfig()
.getBigtableService(pipelineOptions);
BigtableService writeService =
BigtableIO.write()
.withBigtableOptions(options)
.withTableId("TEST-TABLE")
.getBigtableConfig()
.getBigtableService(pipelineOptions);
assertEquals(
CredentialType.None,
readService.getBigtableOptions().getCredentialOptions().getCredentialType());
assertEquals(
CredentialType.None,
writeService.getBigtableOptions().getCredentialOptions().getCredentialType());
}
/** Tests that when reading from a non-existent table, the read fails. */
@Test
public void testReadingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE";
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId(table)
.withBigtableService(service);
// Exception will be thrown by read.validate() when read is applied.
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format("Table %s does not exist", table));
p.apply(read);
p.run();
}
/** Tests that when reading from an empty table, the read succeeds. */
@Test
public void testReadingEmptyTable() throws Exception {
final String table = "TEST-EMPTY-TABLE";
service.createTable(table);
service.setupSampleRowKeys(table, 1, 1L);
runReadTest(defaultRead.withTableId(table), new ArrayList<>());
logged.verifyInfo("Closing reader after reading 0 records.");
}
/** Tests reading all rows from a table. */
@Test
public void testReading() throws Exception {
final String table = "TEST-MANY-ROWS-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
service.setupSampleRowKeys(table, 3, 1000L);
runReadTest(defaultRead.withTableId(table), testRows);
logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows / 3));
}
/** A {@link Predicate} that a {@link Row Row's} key matches the given regex. */
private static class KeyMatchesRegex implements Predicate<ByteString> {
private final String regex;
public KeyMatchesRegex(String regex) {
this.regex = regex;
}
@Override
public boolean apply(@Nullable ByteString input) {
verifyNotNull(input, "input");
return input.toStringUtf8().matches(regex);
}
}
private static List<Row> filterToRange(List<Row> rows, final ByteKeyRange range) {
return filterToRanges(rows, ImmutableList.of(range));
}
private static List<Row> filterToRanges(List<Row> rows, final List<ByteKeyRange> ranges) {
return Lists.newArrayList(
rows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
for (ByteKeyRange range : ranges) {
if (range.containsKey(makeByteKey(input.getKey()))) {
return true;
}
}
return false;
})
.collect(Collectors.toList()));
}
private void runReadTest(BigtableIO.Read read, List<Row> expected) {
PCollection<Row> rows = p.apply(read.getTableId() + "_" + read.getKeyRanges(), read);
PAssert.that(rows).containsInAnyOrder(expected);
p.run();
}
/**
* Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted range
* [] and that some properties hold across them.
*/
@Test
public void testReadingWithKeyRange() throws Exception {
final String table = "TEST-KEY-RANGE-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
ByteKey startKey = ByteKey.copyFrom("key000000100".getBytes(StandardCharsets.UTF_8));
ByteKey endKey = ByteKey.copyFrom("key000000300".getBytes(StandardCharsets.UTF_8));
service.setupSampleRowKeys(table, numRows / 10, "key000000100".length());
// Test prefix: [beginning, startKey).
final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey);
List<Row> prefixRows = filterToRange(testRows, prefixRange);
runReadTest(defaultRead.withTableId(table).withKeyRange(prefixRange), prefixRows);
// Test suffix: [startKey, end).
final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey);
List<Row> suffixRows = filterToRange(testRows, suffixRange);
runReadTest(defaultRead.withTableId(table).withKeyRange(suffixRange), suffixRows);
// Test restricted range: [startKey, endKey).
final ByteKeyRange middleRange = ByteKeyRange.of(startKey, endKey);
List<Row> middleRows = filterToRange(testRows, middleRange);
runReadTest(defaultRead.withTableId(table).withKeyRange(middleRange), middleRows);
//////// Size and content sanity checks //////////
// Prefix, suffix, middle should be non-trivial (non-zero,non-all).
assertThat(prefixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
assertThat(suffixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
assertThat(middleRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
// Prefix + suffix should be exactly all rows.
List<Row> union = Lists.newArrayList(prefixRows);
union.addAll(suffixRows);
assertThat(
"prefix + suffix = total", union, containsInAnyOrder(testRows.toArray(new Row[] {})));
// Suffix should contain the middle.
assertThat(suffixRows, hasItems(middleRows.toArray(new Row[] {})));
}
/** Tests reading three key ranges with one read. */
@Test
public void testReadingWithKeyRanges() throws Exception {
final String table = "TEST-KEY-RANGE-TABLE";
final int numRows = 11;
List<Row> testRows = makeTableData(table, numRows);
ByteKey startKey1 = ByteKey.copyFrom("key000000001".getBytes(StandardCharsets.UTF_8));
ByteKey endKey1 = ByteKey.copyFrom("key000000003".getBytes(StandardCharsets.UTF_8));
ByteKey startKey2 = ByteKey.copyFrom("key000000004".getBytes(StandardCharsets.UTF_8));
ByteKey endKey2 = ByteKey.copyFrom("key000000007".getBytes(StandardCharsets.UTF_8));
ByteKey startKey3 = ByteKey.copyFrom("key000000008".getBytes(StandardCharsets.UTF_8));
ByteKey endKey3 = ByteKey.copyFrom("key000000009".getBytes(StandardCharsets.UTF_8));
service.setupSampleRowKeys(table, numRows / 10, "key000000001".length());
final ByteKeyRange range1 = ByteKeyRange.of(startKey1, endKey1);
final ByteKeyRange range2 = ByteKeyRange.of(startKey2, endKey2);
final ByteKeyRange range3 = ByteKeyRange.of(startKey3, endKey3);
List<ByteKeyRange> ranges = ImmutableList.of(range1, range2, range3);
List<Row> rangeRows = filterToRanges(testRows, ranges);
runReadTest(defaultRead.withTableId(table).withKeyRanges(ranges), rangeRows);
// range rows should be non-trivial (non-zero,non-all).
assertThat(rangeRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
}
/** Tests reading all rows using a filter. */
@Test
public void testReadingWithFilter() throws Exception {
final String table = "TEST-FILTER-TABLE";
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
String regex = ".*17.*";
final KeyMatchesRegex keyPredicate = new KeyMatchesRegex(regex);
Iterable<Row> filteredRows =
testRows.stream()
.filter(
input -> {
verifyNotNull(input, "input");
return keyPredicate.apply(input.getKey());
})
.collect(Collectors.toList());
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
service.setupSampleRowKeys(table, 5, 10L);
runReadTest(
defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows));
}
/** Tests dynamic work rebalancing exhaustively. */
@Test
public void testReadingSplitAtFractionExhaustive() throws Exception {
final String table = "TEST-FEW-ROWS-SPLIT-EXHAUSTIVE-TABLE";
final int numRows = 10;
final int numSamples = 1;
final long bytesPerRow = 1L;
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null,
Arrays.asList(service.getTableRange(table)),
null);
assertSplitAtFractionExhaustive(source, null);
}
/** Unit tests of splitAtFraction. */
@Test
public void testReadingSplitAtFraction() throws Exception {
final String table = "TEST-SPLIT-AT-FRACTION";
final int numRows = 10;
final int numSamples = 1;
final long bytesPerRow = 1L;
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null,
Arrays.asList(service.getTableRange(table)),
null);
// With 0 items read, all split requests will fail.
assertSplitAtFractionFails(source, 0, 0.1, null /* options */);
assertSplitAtFractionFails(source, 0, 1.0, null /* options */);
// With 1 items read, all split requests past 1/10th will succeed.
assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.333, null /* options */);
assertSplitAtFractionSucceedsAndConsistent(source, 1, 0.666, null /* options */);
// With 3 items read, all split requests past 3/10ths will succeed.
assertSplitAtFractionFails(source, 3, 0.2, null /* options */);
assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.571, null /* options */);
assertSplitAtFractionSucceedsAndConsistent(source, 3, 0.9, null /* options */);
// With 6 items read, all split requests past 6/10ths will succeed.
assertSplitAtFractionFails(source, 6, 0.5, null /* options */);
assertSplitAtFractionSucceedsAndConsistent(source, 6, 0.7, null /* options */);
}
/** Tests reading all rows from a split table. */
@Test
public void testReadingWithSplits() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
final int numRows = 1500;
final int numSamples = 10;
final long bytesPerRow = 100L;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
List<BigtableSource> splits =
source.split(numRows * bytesPerRow / numSamples, null /* options */);
// Test num splits and split equality.
assertThat(splits, hasSize(numSamples));
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
private void assertAllSourcesHaveSingleAdjacentRanges(List<BigtableSource> sources) {
if (sources.size() > 0) {
assertThat(sources.get(0).getRanges(), hasSize(1));
for (int i = 1; i < sources.size(); i++) {
assertThat(sources.get(i).getRanges(), hasSize(1));
ByteKey lastEndKey = sources.get(i - 1).getRanges().get(0).getEndKey();
ByteKey currentStartKey = sources.get(i).getRanges().get(0).getStartKey();
assertEquals(lastEndKey, currentStartKey);
}
}
}
private void assertAllSourcesHaveSingleRanges(List<BigtableSource> sources) {
for (BigtableSource source : sources) {
assertThat(source.getRanges(), hasSize(1));
}
}
private ByteKey createByteKey(int key) {
return ByteKey.copyFrom(String.format("key%09d", key).getBytes(StandardCharsets.UTF_8));
}
/** Tests reduce splits with few non adjacent ranges. */
@Test
public void testReduceSplitsWithSomeNonAdjacentRanges() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
final int numRows = 10;
final int numSamples = 10;
final long bytesPerRow = 100L;
final int maxSplit = 3;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Construct few non contiguous key ranges [..1][1..2][3..4][4..5][6..7][8..9]
List<ByteKeyRange> keyRanges =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
ByteKeyRange.of(createByteKey(1), createByteKey(2)),
ByteKeyRange.of(createByteKey(3), createByteKey(4)),
ByteKeyRange.of(createByteKey(4), createByteKey(5)),
ByteKeyRange.of(createByteKey(6), createByteKey(7)),
ByteKeyRange.of(createByteKey(8), createByteKey(9)));
// Expected ranges after split and reduction by maxSplitCount is [..2][3..5][6..7][8..9]
List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(2)),
ByteKeyRange.of(createByteKey(3), createByteKey(5)),
ByteKeyRange.of(createByteKey(6), createByteKey(7)),
ByteKeyRange.of(createByteKey(8), createByteKey(9)));
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
List<BigtableSource> splits = new ArrayList<>();
for (ByteKeyRange range : keyRanges) {
splits.add(source.withSingleRange(range));
}
List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit);
List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<>();
for (BigtableSource splitSource : reducedSplits) {
actualRangesAfterSplit.addAll(splitSource.getRanges());
}
assertAllSourcesHaveSingleRanges(reducedSplits);
assertThat(
actualRangesAfterSplit,
IsIterableContainingInAnyOrder.containsInAnyOrder(
expectedKeyRangesAfterReducedSplits.toArray()));
}
/** Tests reduce split with all non adjacent ranges. */
@Test
public void testReduceSplitsWithAllNonAdjacentRange() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
final int numRows = 10;
final int numSamples = 10;
final long bytesPerRow = 100L;
final int maxSplit = 3;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Construct non contiguous key ranges [..1][2..3][4..5][6..7][8..9]
List<ByteKeyRange> keyRanges =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
ByteKeyRange.of(createByteKey(2), createByteKey(3)),
ByteKeyRange.of(createByteKey(4), createByteKey(5)),
ByteKeyRange.of(createByteKey(6), createByteKey(7)),
ByteKeyRange.of(createByteKey(8), createByteKey(9)));
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
List<BigtableSource> splits = new ArrayList<>();
for (ByteKeyRange range : keyRanges) {
splits.add(source.withSingleRange(range));
}
List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit);
List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<>();
for (BigtableSource splitSource : reducedSplits) {
actualRangesAfterSplit.addAll(splitSource.getRanges());
}
assertAllSourcesHaveSingleRanges(reducedSplits);
// The expected split source ranges are exactly same as original
assertThat(
actualRangesAfterSplit,
IsIterableContainingInAnyOrder.containsInAnyOrder(keyRanges.toArray()));
}
/** Tests reduce Splits with all adjacent ranges. */
@Test
public void tesReduceSplitsWithAdjacentRanges() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
final int numRows = 10;
final int numSamples = 10;
final long bytesPerRow = 100L;
final int maxSplit = 3;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
List<BigtableSource> splits = new ArrayList<>();
List<ByteKeyRange> keyRanges =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(1)),
ByteKeyRange.of(createByteKey(1), createByteKey(2)),
ByteKeyRange.of(createByteKey(2), createByteKey(3)),
ByteKeyRange.of(createByteKey(3), createByteKey(4)),
ByteKeyRange.of(createByteKey(4), createByteKey(5)),
ByteKeyRange.of(createByteKey(5), createByteKey(6)),
ByteKeyRange.of(createByteKey(6), createByteKey(7)),
ByteKeyRange.of(createByteKey(7), createByteKey(8)),
ByteKeyRange.of(createByteKey(8), createByteKey(9)),
ByteKeyRange.of(createByteKey(9), ByteKey.EMPTY));
for (ByteKeyRange range : keyRanges) {
splits.add(source.withSingleRange(range));
}
// Splits Source have ranges [..1][1..2][2..3][3..4][4..5][5..6][6..7][7..8][8..9][9..]
// expected reduced Split source ranges are [..4][4..8][8..]
List<ByteKeyRange> expectedKeyRangesAfterReducedSplits =
Arrays.asList(
ByteKeyRange.of(ByteKey.EMPTY, createByteKey(4)),
ByteKeyRange.of(createByteKey(4), createByteKey(8)),
ByteKeyRange.of(createByteKey(8), ByteKey.EMPTY));
List<BigtableSource> reducedSplits = source.reduceSplits(splits, null, maxSplit);
List<ByteKeyRange> actualRangesAfterSplit = new ArrayList<>();
for (BigtableSource splitSource : reducedSplits) {
actualRangesAfterSplit.addAll(splitSource.getRanges());
}
assertThat(
actualRangesAfterSplit,
IsIterableContainingInAnyOrder.containsInAnyOrder(
expectedKeyRangesAfterReducedSplits.toArray()));
assertAllSourcesHaveSingleAdjacentRanges(reducedSplits);
assertSourcesEqualReferenceSource(source, reducedSplits, null /* options */);
}
/** Tests reading all rows from a split table with several key ranges. */
@Test
public void testReadingWithSplitsWithSeveralKeyRanges() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE-MULTIPLE-RANGES";
final int numRows = 1500;
final int numSamples = 10;
// Two more splits are generated because of the split keys at 500 and 1000.
// E.g. the split [450, 600) becomes [450, 500) and [500, 600).
final int numSplits = 12;
final long bytesPerRow = 100L;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
ByteKey splitKey1 = ByteKey.copyFrom("key000000500".getBytes(StandardCharsets.UTF_8));
ByteKey splitKey2 = ByteKey.copyFrom("key000001000".getBytes(StandardCharsets.UTF_8));
ByteKeyRange tableRange = service.getTableRange(table);
List<ByteKeyRange> keyRanges =
Arrays.asList(
tableRange.withEndKey(splitKey1),
tableRange.withStartKey(splitKey1).withEndKey(splitKey2),
tableRange.withStartKey(splitKey2));
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
BigtableSource referenceSource =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
ImmutableList.of(service.getTableRange(table)),
null /*size*/);
List<BigtableSource> splits = // 10,000
source.split(numRows * bytesPerRow / numSamples, null /* options */);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
assertSourcesEqualReferenceSource(referenceSource, splits, null /* options */);
}
/** Tests reading all rows from a sub-split table. */
@Test
public void testReadingWithSubSplits() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE";
final int numRows = 1000;
final int numSamples = 10;
final int numSplits = 20;
final long bytesPerRow = 100L;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
/** Tests reading all rows from a sub-split table with several key ranges. */
@Test
public void testReadingWithSubSplitsWithSeveralKeyRanges() throws Exception {
final String table = "TEST-MANY-ROWS-SPLITS-TABLE-MULTIPLE-RANGES";
final int numRows = 1000;
final int numSamples = 10;
final int numSplits = 20;
// We expect 24 splits instead of 20 due to the multiple ranges. For a key of 330 separating
// the multiple ranges, first the [300, 330) range is subsplit into two (since numSplits is
// twice numSamples), so we get [300, 315) and [315, 330). Then, the [330, 400) range is also
// split into two, resulting in [330, 365) and [365, 400). These ranges would instead be
// [300, 350) and [350, 400) if this source was one range. Thus, each extra range adds two
// resulting splits.
final int expectedNumSplits = 24;
final long bytesPerRow = 100L;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
ByteKey splitKey1 = ByteKey.copyFrom("key000000330".getBytes(StandardCharsets.UTF_8));
ByteKey splitKey2 = ByteKey.copyFrom("key000000730".getBytes(StandardCharsets.UTF_8));
ByteKeyRange tableRange = service.getTableRange(table);
List<ByteKeyRange> keyRanges =
Arrays.asList(
tableRange.withEndKey(splitKey1),
tableRange.withStartKey(splitKey1).withEndKey(splitKey2),
tableRange.withStartKey(splitKey2));
// Generate source and split it.
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
keyRanges,
null /*size*/);
BigtableSource referenceSource =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null /*filter*/,
ImmutableList.of(service.getTableRange(table)),
null /*size*/);
List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(expectedNumSplits));
assertSourcesEqualReferenceSource(referenceSource, splits, null /* options */);
}
/** Tests reading all rows from a sub-split table. */
@Test
public void testReadingWithFilterAndSubSplits() throws Exception {
final String table = "TEST-FILTER-SUB-SPLITS";
final int numRows = 1700;
final int numSamples = 10;
final int numSplits = 20;
final long bytesPerRow = 100L;
// Set up test table data and sample row keys for size estimation and splitting.
makeTableData(table, numRows);
service.setupSampleRowKeys(table, numSamples, bytesPerRow);
// Generate source and split it.
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(".*17.*")).build();
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
filter,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null /*size*/);
List<BigtableSource> splits = source.split(numRows * bytesPerRow / numSplits, null);
// Test num splits and split equality.
assertThat(splits, hasSize(numSplits));
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
@Test
public void testReadingDisplayData() {
RowFilter rowFilter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")).build();
ByteKeyRange keyRange = ByteKeyRange.ALL_KEYS.withEndKey(ByteKey.of(0xab, 0xcd));
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("fooTable")
.withRowFilter(rowFilter)
.withKeyRange(keyRange);
DisplayData displayData = DisplayData.from(read);
assertThat(
displayData,
hasDisplayItem(
allOf(hasKey("tableId"), hasLabel("Bigtable Table Id"), hasValue("fooTable"))));
assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
assertThat(displayData, hasDisplayItem("keyRange 0", keyRange.toString()));
// BigtableIO adds user-agent to options; assert only on key and not value.
assertThat(displayData, hasDisplayItem("bigtableOptions"));
}
@Test
public void testReadingPrimitiveDisplayData() throws IOException, InterruptedException {
final String table = "fooTable";
service.createTable(table);
RowFilter rowFilter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*")).build();
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId(table)
.withRowFilter(rowFilter)
.withBigtableService(service);
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat(
"BigtableIO.Read should include the table id in its primitive display data",
displayData,
Matchers.hasItem(hasDisplayItem("tableId")));
assertThat(
"BigtableIO.Read should include the row filter, if it exists, in its primitive "
+ "display data",
displayData,
Matchers.hasItem(hasDisplayItem("rowFilter")));
}
@Test
public void testReadingDisplayDataFromRuntimeParameters() {
ReadOptions options = PipelineOptionsFactory.fromArgs().withValidation().as(ReadOptions.class);
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withProjectId(options.getBigtableProject())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
DisplayData displayData = DisplayData.from(read);
assertThat(
displayData,
hasDisplayItem(
allOf(
hasKey("projectId"),
hasLabel("Bigtable Project Id"),
hasValue("RuntimeValueProvider{propertyName=bigtableProject, default=null}"))));
assertThat(
displayData,
hasDisplayItem(
allOf(
hasKey("instanceId"),
hasLabel("Bigtable Instance Id"),
hasValue("RuntimeValueProvider{propertyName=bigtableInstanceId, default=null}"))));
assertThat(
displayData,
hasDisplayItem(
allOf(
hasKey("tableId"),
hasLabel("Bigtable Table Id"),
hasValue("RuntimeValueProvider{propertyName=bigtableTableId, default=null}"))));
}
@Test
public void testReadWithoutValidate() {
final String table = "fooTable";
BigtableIO.Read read =
BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId(table)
.withBigtableService(service)
.withoutValidation();
// validate() will throw if withoutValidation() isn't working
read.validate(TestPipeline.testingPipelineOptions());
}
@Test
public void testWriteWithoutValidate() {
final String table = "fooTable";
BigtableIO.Write write =
BigtableIO.write()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId(table)
.withBigtableService(service)
.withoutValidation();
// validate() will throw if withoutValidation() isn't working
write.validate(TestPipeline.testingPipelineOptions());
}
/** Tests that a record gets written to the service and messages are logged. */
@Test
public void testWriting() throws Exception {
final String table = "table";
final String key = "key";
final String value = "value";
service.createTable(table);
p.apply("single row", Create.of(makeWrite(key, value)).withCoder(bigtableCoder))
.apply("write", defaultWrite.withTableId(table));
p.run();
logged.verifyDebug("Wrote 1 records");
assertEquals(1, service.tables.size());
assertNotNull(service.getTable(table));
Map<ByteString, ByteString> rows = service.getTable(table);
assertEquals(1, rows.size());
assertEquals(ByteString.copyFromUtf8(value), rows.get(ByteString.copyFromUtf8(key)));
}
/** Tests that when writing to a non-existent table, the write fails. */
@Test
public void testWritingFailsTableDoesNotExist() throws Exception {
final String table = "TEST-TABLE";
PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
p.apply(
Create.empty(
KvCoder.of(ByteStringCoder.of(), IterableCoder.of(ProtoCoder.of(Mutation.class)))));
// Exception will be thrown by write.validate() when writeToDynamic is applied.
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(String.format("Table %s does not exist", table));
emptyInput.apply("write", defaultWrite.withTableId(table));
p.run();
}
/** Tests that when writing to a non-existent table, the write fails. */
@Test
public void testTableCheckIgnoredWhenCanNotAccessConfig() throws Exception {
PCollection<KV<ByteString, Iterable<Mutation>>> emptyInput =
p.apply(
Create.empty(
KvCoder.of(ByteStringCoder.of(), IterableCoder.of(ProtoCoder.of(Mutation.class)))));
emptyInput.apply("write", defaultWrite.withTableId(NOT_ACCESSIBLE_VALUE));
p.run();
}
/** Tests that when writing an element fails, the write fails. */
@Test
public void testWritingFailsBadElement() throws Exception {
final String table = "TEST-TABLE";
final String key = "KEY";
service.createTable(table);
p.apply(Create.of(makeBadWrite(key)).withCoder(bigtableCoder))
.apply(defaultWrite.withTableId(table));
thrown.expect(PipelineExecutionException.class);
thrown.expectCause(Matchers.instanceOf(IOException.class));
thrown.expectMessage("At least 1 errors occurred writing to Bigtable. First 1 errors:");
thrown.expectMessage("Error mutating row " + key + " with mutations []: cell value missing");
p.run();
}
@Test
public void testWritingDisplayData() {
BigtableIO.Write write =
BigtableIO.write().withTableId("fooTable").withBigtableOptions(BIGTABLE_OPTIONS);
DisplayData displayData = DisplayData.from(write);
assertThat(displayData, hasDisplayItem("tableId", "fooTable"));
}
@Test
public void testGetSplitPointsConsumed() throws Exception {
final String table = "TEST-TABLE";
final int numRows = 100;
int splitPointsConsumed = 0;
makeTableData(table, numRows);
BigtableSource source =
new BigtableSource(
config.withTableId(ValueProvider.StaticValueProvider.of(table)),
null,
Arrays.asList(ByteKeyRange.ALL_KEYS),
null);
BoundedReader<Row> reader = source.createReader(TestPipeline.testingPipelineOptions());
reader.start();
// Started, 0 split points consumed
assertEquals(
"splitPointsConsumed starting", splitPointsConsumed, reader.getSplitPointsConsumed());
// Split points consumed increases for each row read
while (reader.advance()) {
assertEquals(
"splitPointsConsumed advancing", ++splitPointsConsumed, reader.getSplitPointsConsumed());
}
// Reader marked as done, 100 split points consumed
assertEquals("splitPointsConsumed done", numRows, reader.getSplitPointsConsumed());
reader.close();
}
@Test
public void testReadWithBigTableOptionsSetsRetryOptions() {
final int initialBackoffMillis = -1;
BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
optionsBuilder.setRetryOptions(retryOptionsBuilder.build());
BigtableIO.Read read = BigtableIO.read().withBigtableOptions(optionsBuilder.build());
BigtableOptions options = read.getBigtableOptions();
assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
assertThat(options.getRetryOptions(), Matchers.equalTo(retryOptionsBuilder.build()));
}
@Test
public void testWriteWithBigTableOptionsSetsBulkOptionsAndRetryOptions() {
final int maxInflightRpcs = 1;
final int initialBackoffMillis = -1;
BigtableOptions.Builder optionsBuilder = BIGTABLE_OPTIONS.toBuilder();
BulkOptions.Builder bulkOptionsBuilder = new BulkOptions.Builder();
bulkOptionsBuilder.setMaxInflightRpcs(maxInflightRpcs);
RetryOptions.Builder retryOptionsBuilder = new RetryOptions.Builder();
retryOptionsBuilder.setInitialBackoffMillis(initialBackoffMillis);
optionsBuilder
.setBulkOptions(bulkOptionsBuilder.build())
.setRetryOptions(retryOptionsBuilder.build());
BigtableIO.Write write = BigtableIO.write().withBigtableOptions(optionsBuilder.build());
BigtableOptions options = write.getBigtableOptions();
assertEquals(true, options.getBulkOptions().useBulkApi());
assertEquals(maxInflightRpcs, options.getBulkOptions().getMaxInflightRpcs());
assertEquals(initialBackoffMillis, options.getRetryOptions().getInitialBackoffMillis());
assertThat(
options.getBulkOptions(), Matchers.equalTo(bulkOptionsBuilder.setUseBulkApi(true).build()));
assertThat(options.getRetryOptions(), Matchers.equalTo(retryOptionsBuilder.build()));
}
////////////////////////////////////////////////////////////////////////////////////////////
private static final String COLUMN_FAMILY_NAME = "family";
private static final ByteString COLUMN_NAME = ByteString.copyFromUtf8("column");
private static final Column TEST_COLUMN = Column.newBuilder().setQualifier(COLUMN_NAME).build();
private static final Family TEST_FAMILY = Family.newBuilder().setName(COLUMN_FAMILY_NAME).build();
/** Helper function that builds a {@link Row} in a test table that could be returned by read. */
private static Row makeRow(ByteString key, ByteString value) {
// Build the currentRow and return true.
Column.Builder newColumn = TEST_COLUMN.toBuilder().addCells(Cell.newBuilder().setValue(value));
return Row.newBuilder()
.setKey(key)
.addFamilies(TEST_FAMILY.toBuilder().addColumns(newColumn))
.build();
}
/** Helper function to create a table and return the rows that it created. */
private static List<Row> makeTableData(String tableId, int numRows) {
service.createTable(tableId);
Map<ByteString, ByteString> testData = service.getTable(tableId);
List<Row> testRows = new ArrayList<>(numRows);
for (int i = 0; i < numRows; ++i) {
ByteString key = ByteString.copyFromUtf8(String.format("key%09d", i));
ByteString value = ByteString.copyFromUtf8(String.format("value%09d", i));
testData.put(key, value);
testRows.add(makeRow(key, value));
}
return testRows;
}
/** A {@link BigtableService} implementation that stores tables and their contents in memory. */
private static class FakeBigtableService implements BigtableService {
private final Map<String, SortedMap<ByteString, ByteString>> tables = new HashMap<>();
private final Map<String, List<SampleRowKeysResponse>> sampleRowKeys = new HashMap<>();
@Override
public BigtableOptions getBigtableOptions() {
return null;
}
@Nullable
public SortedMap<ByteString, ByteString> getTable(String tableId) {
return tables.get(tableId);
}
public ByteKeyRange getTableRange(String tableId) {
verifyTableExists(tableId);
SortedMap<ByteString, ByteString> data = tables.get(tableId);
return ByteKeyRange.of(makeByteKey(data.firstKey()), makeByteKey(data.lastKey()));
}
public void createTable(String tableId) {
tables.put(tableId, new TreeMap<>(new ByteStringComparator()));
}
@Override
public boolean tableExists(String tableId) {
return tables.containsKey(tableId);
}
public void verifyTableExists(String tableId) {
checkArgument(tableExists(tableId), "Table %s does not exist", tableId);
}
@Override
public FakeBigtableReader createReader(BigtableSource source) {
return new FakeBigtableReader(source);
}
@Override
public FakeBigtableWriter openForWriting(String tableId) {
return new FakeBigtableWriter(tableId);
}
@Override
public List<SampleRowKeysResponse> getSampleRowKeys(BigtableSource source) {
List<SampleRowKeysResponse> samples = sampleRowKeys.get(source.getTableId().get());
checkNotNull(samples, "No samples found for table %s", source.getTableId().get());
return samples;
}
/** Sets up the sample row keys for the specified table. */
void setupSampleRowKeys(String tableId, int numSamples, long bytesPerRow) {
verifyTableExists(tableId);
checkArgument(numSamples > 0, "Number of samples must be positive: %s", numSamples);
checkArgument(bytesPerRow > 0, "Bytes/Row must be positive: %s", bytesPerRow);
ImmutableList.Builder<SampleRowKeysResponse> ret = ImmutableList.builder();
SortedMap<ByteString, ByteString> rows = getTable(tableId);
int currentSample = 1;
int rowsSoFar = 0;
for (Map.Entry<ByteString, ByteString> entry : rows.entrySet()) {
if (((double) rowsSoFar) / rows.size() >= ((double) currentSample) / numSamples) {
// add the sample with the total number of bytes in the table before this key.
ret.add(
SampleRowKeysResponse.newBuilder()
.setRowKey(entry.getKey())
.setOffsetBytes(rowsSoFar * bytesPerRow)
.build());
// Move on to next sample
currentSample++;
}
++rowsSoFar;
}
// Add the last sample indicating the end of the table, with all rows before it.
ret.add(SampleRowKeysResponse.newBuilder().setOffsetBytes(rows.size() * bytesPerRow).build());
sampleRowKeys.put(tableId, ret.build());
}
}
/**
* A {@link BigtableService.Reader} implementation that reads from the static instance of {@link
* FakeBigtableService} stored in {@link #service}.
*
* <p>This reader does not support {@link RowFilter} objects.
*/
private static class FakeBigtableReader implements BigtableService.Reader {
private final BigtableSource source;
private Iterator<Map.Entry<ByteString, ByteString>> rows;
private Row currentRow;
private Predicate<ByteString> filter;
public FakeBigtableReader(BigtableSource source) {
this.source = source;
if (source.getRowFilter() == null) {
filter = Predicates.alwaysTrue();
} else {
ByteString keyRegex = source.getRowFilter().getRowKeyRegexFilter();
checkArgument(!keyRegex.isEmpty(), "Only RowKeyRegexFilter is supported");
filter = new KeyMatchesRegex(keyRegex.toStringUtf8());
}
service.verifyTableExists(source.getTableId().get());
}
@Override
public boolean start() {
rows = service.tables.get(source.getTableId().get()).entrySet().iterator();
return advance();
}
@Override
public boolean advance() {
// Loop until we find a row in range, or reach the end of the iterator.
Map.Entry<ByteString, ByteString> entry = null;
while (rows.hasNext()) {
entry = rows.next();
if (!filter.apply(entry.getKey())
|| !rangesContainsKey(source.getRanges(), makeByteKey(entry.getKey()))) {
// Does not match row filter or does not match source range. Skip.
entry = null;
continue;
}
// Found a row inside this source's key range, stop.
break;
}
// Return false if no more rows.
if (entry == null) {
currentRow = null;
return false;
}
// Set the current row and return true.
currentRow = makeRow(entry.getKey(), entry.getValue());
return true;
}
private boolean rangesContainsKey(List<ByteKeyRange> ranges, ByteKey key) {
for (ByteKeyRange range : ranges) {
if (range.containsKey(key)) {
return true;
}
}
return false;
}
@Override
public Row getCurrentRow() {
if (currentRow == null) {
throw new NoSuchElementException();
}
return currentRow;
}
@Override
public void close() {
rows = null;
currentRow = null;
}
}
/**
* A {@link BigtableService.Writer} implementation that writes to the static instance of {@link
* FakeBigtableService} stored in {@link #service}.
*
* <p>This writer only supports {@link Mutation Mutations} that consist only of {@link SetCell}
* entries. The column family in the {@link SetCell} is ignored; only the value is used.
*
* <p>When no {@link SetCell} is provided, the write will fail and this will be exposed via an
* exception on the returned {@link CompletionStage}.
*/
private static class FakeBigtableWriter implements BigtableService.Writer {
private final String tableId;
public FakeBigtableWriter(String tableId) {
this.tableId = tableId;
}
@Override
public CompletionStage<MutateRowResponse> writeRecord(
KV<ByteString, Iterable<Mutation>> record) {
service.verifyTableExists(tableId);
Map<ByteString, ByteString> table = service.getTable(tableId);
ByteString key = record.getKey();
for (Mutation m : record.getValue()) {
SetCell cell = m.getSetCell();
if (cell.getValue().isEmpty()) {
CompletableFuture<MutateRowResponse> result = new CompletableFuture<>();
result.completeExceptionally(new IOException("cell value missing"));
return result;
}
table.put(key, cell.getValue());
}
return CompletableFuture.completedFuture(MutateRowResponse.getDefaultInstance());
}
@Override
public void flush() {}
@Override
public void close() {}
}
/** A serializable comparator for ByteString. Used to make row samples. */
private static final class ByteStringComparator implements Comparator<ByteString>, Serializable {
@Override
public int compare(ByteString o1, ByteString o2) {
return makeByteKey(o1).compareTo(makeByteKey(o2));
}
}
}