blob: e6f8eeb8f602b7ebd0a10ea120f2344baa905c58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.FailedPreconditionException;
import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroRows;
import com.google.cloud.bigquery.storage.v1beta1.AvroProto.AvroSchema;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.SplitReadStreamResponse;
import com.google.cloud.bigquery.storage.v1beta1.Storage.Stream;
import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamPosition;
import com.google.cloud.bigquery.storage.v1beta1.Storage.StreamStatus;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnknownFieldSet;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.ByteArrayOutputStream;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.BoundedSource.BoundedReader;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
/** Tests for {@link BigQueryIO#readTableRows() using {@link Method#DIRECT_READ}}. */
@RunWith(JUnit4.class)
public class BigQueryIOStorageReadTest {
private transient PipelineOptions options;
private transient TemporaryFolder testFolder = new TemporaryFolder();
private transient TestPipeline p;
@Rule
public final transient TestRule folderThenPipeline =
new TestRule() {
@Override
public Statement apply(Statement base, Description description) {
// We need to set up the temporary folder, and then set up the TestPipeline based on the
// chosen folder. Unfortunately, since rule evaluation order is unspecified and unrelated
// to field order, and is separate from construction, that requires manually creating this
// TestRule.
Statement withPipeline =
new Statement() {
@Override
public void evaluate() throws Throwable {
options = TestPipeline.testingPipelineOptions();
options.as(BigQueryOptions.class).setProject("project-id");
options
.as(BigQueryOptions.class)
.setTempLocation(testFolder.getRoot().getAbsolutePath());
p = TestPipeline.fromOptions(options);
p.apply(base, description).evaluate();
}
};
return testFolder.apply(withPipeline, description);
}
};
@Rule public transient ExpectedException thrown = ExpectedException.none();
private FakeDatasetService fakeDatasetService = new FakeDatasetService();
@Before
public void setUp() throws Exception {
FakeDatasetService.setUp();
}
@Test
public void testBuildTableBasedSource() {
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table");
checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
assertTrue(typedRead.getValidate());
}
@Test
public void testBuildTableBasedSourceWithoutValidation() {
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.withoutValidation();
checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
assertFalse(typedRead.getValidate());
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("myDataset.myTable");
checkTypedReadTableObject(typedRead, null, "myDataset", "myTable");
}
@Test
public void testBuildTableBasedSourceWithReadOptions() {
TableReadOptions readOptions =
TableReadOptions.newBuilder()
.addSelectedFields("field1")
.addSelectedFields("field2")
.setRowRestriction("int_field > 5")
.build();
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.withReadOptions(readOptions);
checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
assertEquals(typedRead.getReadOptions(), readOptions);
}
@Test
public void testBuildTableBasedSourceWithTableReference() {
TableReference tableReference =
new TableReference()
.setProjectId("foo.com:project")
.setDatasetId("dataset")
.setTableId("table");
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from(tableReference);
checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table");
}
private void checkTypedReadTableObject(
TypedRead typedRead, String project, String dataset, String table) {
assertEquals(project, typedRead.getTable().getProjectId());
assertEquals(dataset, typedRead.getTable().getDatasetId());
assertEquals(table, typedRead.getTable().getTableId());
assertNull(typedRead.getQuery());
assertEquals(Method.DIRECT_READ, typedRead.getMethod());
}
@Test
public void testBuildSourceWithTableAndFlatten() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply(
"ReadMyTable",
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.withoutResultFlattening());
p.run();
}
@Test
public void testBuildSourceWithTableAndSqlDialect() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
+ " which only applies to queries");
p.apply(
"ReadMyTable",
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.usingStandardSql());
p.run();
}
@Test
public void testBuildSourceWithReadOptionsAndSelectedFields() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("withReadOptions() already called");
p.apply(
"ReadMyTable",
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.withReadOptions(TableReadOptions.newBuilder().build())
.withSelectedFields(Lists.newArrayList("field1")));
}
@Test
public void testBuildSourceWithReadOptionsAndRowRestriction() {
thrown.expect(IllegalStateException.class);
thrown.expectMessage("withReadOptions() already called");
p.apply(
"ReadMyTable",
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.withReadOptions(TableReadOptions.newBuilder().build())
.withRowRestriction("field > 1"));
}
@Test
public void testDisplayData() {
String tableSpec = "foo.com:project:dataset.table";
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from(tableSpec);
DisplayData displayData = DisplayData.from(typedRead);
assertThat(displayData, hasDisplayItem("table", tableSpec));
}
@Test
public void testEvaluatedDisplayData() {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigQueryIO.TypedRead<TableRow> typedRead =
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(typedRead);
assertThat(displayData, hasItem(hasDisplayItem("table")));
}
@Test
public void testName() {
assertEquals(
"BigQueryIO.TypedRead",
BigQueryIO.read(new TableRowParser())
.withCoder(TableRowJsonCoder.of())
.withMethod(Method.DIRECT_READ)
.from("foo.com:project:dataset.table")
.getName());
}
@Test
public void testCoderInference() {
// Lambdas erase too much type information -- use an anonymous class here.
SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>> parseFn =
new SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>>() {
@Override
public KV<ByteString, ReadSession> apply(SchemaAndRecord input) {
return null;
}
};
assertEquals(
KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(ReadSession.class)),
BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault()));
}
@Test
public void testTableSourceEstimatedSize() throws Exception {
doTableSourceEstimatedSizeTest(false);
}
@Test
public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws Exception {
doTableSourceEstimatedSizeTest(true);
}
private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
if (useStreamingBuffer) {
table.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.TEN));
}
fakeDatasetService.createTable(table);
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withDatasetService(fakeDatasetService));
assertEquals(100, tableSource.getEstimatedSizeBytes(options));
}
@Test
public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception {
fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table");
Table table = new Table().setTableReference(tableRef).setNumBytes(100L);
fakeDatasetService.createTable(table);
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withDatasetService(fakeDatasetService));
assertEquals(100, tableSource.getEstimatedSizeBytes(options));
}
@Test
public void testTableSourceInitialSplit() throws Exception {
doTableSourceInitialSplitTest(1024L, 1024);
}
@Test
public void testTableSourceInitialSplit_MinSplitCount() throws Exception {
doTableSourceInitialSplitTest(1024L * 1024L, 10);
}
@Test
public void testTableSourceInitialSplit_MaxSplitCount() throws Exception {
doTableSourceInitialSplitTest(10L, 10_000);
}
private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table =
new Table()
.setTableReference(tableRef)
.setNumBytes(1024L * 1024L)
.setSchema(new TableSchema());
fakeDatasetService.createTable(table);
CreateReadSessionRequest expectedRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(streamCount)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession.Builder builder = ReadSession.newBuilder();
for (int i = 0; i < streamCount; i++) {
builder.addStreams(Stream.newBuilder().setName("stream-" + i));
}
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));
List<? extends BoundedSource<TableRow>> sources = tableSource.split(bundleSize, options);
assertEquals(streamCount, sources.size());
}
@Test
public void testTableSourceInitialSplit_WithTableReadOptions() throws Throwable {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table =
new Table()
.setTableReference(tableRef)
.setNumBytes(100L)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
fakeDatasetService.createTable(table);
TableReadOptions readOptions =
TableReadOptions.newBuilder()
.addSelectedFields("name")
.addSelectedFields("number")
.setRowRestriction("number > 5")
.build();
CreateReadSessionRequest expectedRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(10)
.setReadOptions(readOptions)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession.Builder builder = ReadSession.newBuilder();
for (int i = 0; i < 10; i++) {
builder.addStreams(Stream.newBuilder().setName("stream-" + i));
}
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
readOptions,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));
List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, options);
assertEquals(10L, sources.size());
}
@Test
public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table =
new Table()
.setTableReference(tableRef)
.setNumBytes(100L)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
fakeDatasetService.createTable(table);
TableReadOptions readOptions =
TableReadOptions.newBuilder()
.addSelectedFields("name")
.addSelectedFields("number")
.setRowRestriction("number > 5")
.build();
CreateReadSessionRequest expectedRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(10)
.setReadOptions(readOptions)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession.Builder builder = ReadSession.newBuilder();
for (int i = 0; i < 10; i++) {
builder.addStreams(Stream.newBuilder().setName("stream-" + i));
}
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
null,
StaticValueProvider.of(Lists.newArrayList("name", "number")),
StaticValueProvider.of("number > 5"),
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));
List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, options);
assertEquals(10L, sources.size());
}
@Test
public void testTableSourceInitialSplit_WithDefaultProject() throws Exception {
fakeDatasetService.createDataset("project-id", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table");
Table table =
new Table()
.setTableReference(tableRef)
.setNumBytes(1024L * 1024L)
.setSchema(new TableSchema());
fakeDatasetService.createTable(table);
CreateReadSessionRequest expectedRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(1024)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession.Builder builder = ReadSession.newBuilder();
for (int i = 0; i < 50; i++) {
builder.addStreams(Stream.newBuilder().setName("stream-" + i));
}
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build());
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));
List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, options);
assertEquals(50L, sources.size());
}
@Test
public void testTableSourceInitialSplit_EmptyTable() throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table =
new Table()
.setTableReference(tableRef)
.setNumBytes(1024L * 1024L)
.setSchema(new TableSchema());
fakeDatasetService.createTable(table);
CreateReadSessionRequest expectedRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(1024)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession emptyReadSession = ReadSession.newBuilder().build();
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(emptyReadSession);
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(tableRef),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient));
List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, options);
assertTrue(sources.isEmpty());
}
@Test
public void testTableSourceCreateReader() throws Exception {
BigQueryStorageTableSource<TableRow> tableSource =
BigQueryStorageTableSource.create(
ValueProvider.StaticValueProvider.of(
BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")),
null,
null,
null,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withDatasetService(fakeDatasetService));
thrown.expect(UnsupportedOperationException.class);
thrown.expectMessage("BigQuery storage source must be split before reading");
tableSource.createReader(options);
}
private static final String AVRO_SCHEMA_STRING =
"{\"namespace\": \"example.avro\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"RowRecord\",\n"
+ " \"fields\": [\n"
+ " {\"name\": \"name\", \"type\": \"string\"},\n"
+ " {\"name\": \"number\", \"type\": \"long\"}\n"
+ " ]\n"
+ "}";
private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(AVRO_SCHEMA_STRING);
private static final TableSchema TABLE_SCHEMA =
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"),
new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED")));
private static GenericRecord createRecord(String name, long number, Schema schema) {
GenericRecord genericRecord = new Record(schema);
genericRecord.put("name", name);
genericRecord.put("number", number);
return genericRecord;
}
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static ReadRowsResponse createResponse(
Schema schema, Collection<GenericRecord> genericRecords, double fractionConsumed)
throws Exception {
GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Encoder binaryEncoder = ENCODER_FACTORY.binaryEncoder(outputStream, null);
for (GenericRecord genericRecord : genericRecords) {
writer.write(genericRecord, binaryEncoder);
}
binaryEncoder.flush();
return ReadRowsResponse.newBuilder()
.setAvroRows(
AvroRows.newBuilder()
.setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray()))
.setRowCount(genericRecords.size()))
.setStatus(
StreamStatus.newBuilder()
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFractionConsumed().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(
java.lang.Float.floatToIntBits((float) fractionConsumed))
.build())
.build()))
.build();
}
@Test
public void testStreamSourceEstimatedSizeBytes() throws Exception {
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
ReadSession.getDefaultInstance(),
Stream.getDefaultInstance(),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices());
assertEquals(0, streamSource.getEstimatedSizeBytes(options));
}
@Test
public void testStreamSourceSplit() throws Exception {
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
ReadSession.getDefaultInstance(),
Stream.getDefaultInstance(),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices());
assertThat(streamSource.split(0, options), containsInAnyOrder(streamSource));
}
@Test
public void testReadFromStreamSource() throws Exception {
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();
Stream stream = Stream.newBuilder().setName("stream").build();
ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(stream))
.build();
List<GenericRecord> records =
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA),
createRecord("B", 2, AVRO_SCHEMA),
createRecord("C", 3, AVRO_SCHEMA));
List<ReadRowsResponse> responses =
Lists.newArrayList(
createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.50),
createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
stream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
List<TableRow> rows = new ArrayList<>();
BoundedReader<TableRow> reader = streamSource.createReader(options);
for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) {
rows.add(reader.getCurrent());
}
System.out.println("Rows: " + rows);
assertEquals(3, rows.size());
}
@Test
public void testFractionConsumed() throws Exception {
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();
Stream stream = Stream.newBuilder().setName("stream").build();
ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(stream))
.build();
List<GenericRecord> records =
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA),
createRecord("B", 2, AVRO_SCHEMA),
createRecord("C", 3, AVRO_SCHEMA),
createRecord("D", 4, AVRO_SCHEMA),
createRecord("E", 5, AVRO_SCHEMA),
createRecord("F", 6, AVRO_SCHEMA),
createRecord("G", 7, AVRO_SCHEMA));
List<ReadRowsResponse> responses =
Lists.newArrayList(
// N.B.: All floating point numbers used in this test can be represented without
// a loss of precision.
createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.250),
createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.500),
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest))
.thenReturn(new FakeBigQueryServerStream<>(responses));
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
stream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
List<TableRow> rows = new ArrayList<>();
BoundedReader<TableRow> reader = streamSource.createReader(options);
// Before call to BoundedReader#start, fraction consumed must be zero.
assertEquals(Double.valueOf(0.000), reader.getFractionConsumed());
assertTrue(reader.start()); // Reads A.
assertEquals(Double.valueOf(0.125), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads B.
assertEquals(Double.valueOf(0.250), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads C.
assertEquals(Double.valueOf(0.375), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads D.
assertEquals(Double.valueOf(0.500), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads E.
assertEquals(Double.valueOf(0.625), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads F.
assertEquals(Double.valueOf(0.750), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads G.
assertEquals(Double.valueOf(0.875), reader.getFractionConsumed());
assertFalse(reader.advance()); // Reaches the end.
// We are done with the stream, so we should report 100% consumption.
assertEquals(Double.valueOf(1.00), reader.getFractionConsumed());
}
@Test
public void testFractionConsumedWithSplit() throws Exception {
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build();
Stream parentStream = Stream.newBuilder().setName("stream").build();
ReadRowsRequest expectedRequest =
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(parentStream))
.build();
List<GenericRecord> records =
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA),
createRecord("B", 2, AVRO_SCHEMA),
createRecord("C", 3, AVRO_SCHEMA),
createRecord("D", 4, AVRO_SCHEMA),
createRecord("E", 5, AVRO_SCHEMA),
createRecord("F", 6, AVRO_SCHEMA),
createRecord("G", 7, AVRO_SCHEMA));
List<ReadRowsResponse> parentResponses =
Lists.newArrayList(
// N.B.: All floating point numbers used in this test can be represented without
// a loss of precision.
createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.250),
createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.500),
createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.875));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(expectedRequest))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(parentStream)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.5f))
.build())
.build())
.build()))
.thenReturn(
SplitReadStreamResponse.newBuilder()
.setPrimaryStream(Stream.newBuilder().setName("primary"))
.setRemainderStream(Stream.newBuilder().setName("residual"))
.build());
List<ReadRowsResponse> primaryResponses =
Lists.newArrayList(
createResponse(AVRO_SCHEMA, records.subList(1, 3), 0.500),
createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.875));
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(
StreamPosition.newBuilder()
.setStream(Stream.newBuilder().setName("primary"))
.setOffset(1))
.build()))
.thenReturn(new FakeBigQueryServerStream<>(primaryResponses));
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
readSession,
parentStream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
List<TableRow> rows = new ArrayList<>();
BoundedReader<TableRow> reader = streamSource.createReader(options);
// Before call to BoundedReader#start, fraction consumed must be zero.
assertEquals(Double.valueOf(0.0000), reader.getFractionConsumed());
assertTrue(reader.start()); // Reads A.
assertEquals(Double.valueOf(0.1250), reader.getFractionConsumed());
reader.splitAtFraction(0.5f);
assertEquals(Double.valueOf(0.1250), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads B.
// Once the split has completed but no new rows have been read, the consumed value is at the
// last calculated value of 0.125. For the first response of the primary stream, the progress
// report interpolation is done between the progress before split and the progress from the
// first response of the primary stream. In this case, the value is:
//
// 0.125 + (0.5 - 0.125) * 1.0 / 2
//
assertEquals(Double.valueOf(0.3125), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads C.
assertEquals(Double.valueOf(0.5000), reader.getFractionConsumed());
assertTrue(reader.advance()); // Reads D.
assertEquals(Double.valueOf(0.8750), reader.getFractionConsumed());
assertFalse(reader.advance());
assertEquals(Double.valueOf(1.0000), reader.getFractionConsumed());
}
@Test
public void testStreamSourceSplitAtFractionSucceeds() throws Exception {
Stream parentStream = Stream.newBuilder().setName("parent").build();
List<ReadRowsResponse> parentResponses =
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)),
0.25),
createResponse(
AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.50),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)),
0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(parentStream))
.build()))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call.
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(parentStream)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.5f))
.build())
.build())
.build()))
.thenReturn(
SplitReadStreamResponse.newBuilder()
.setPrimaryStream(Stream.newBuilder().setName("primary"))
.setRemainderStream(Stream.newBuilder().setName("residual"))
.build());
// Mocks the ReadRows calls expected on the primary and residual streams.
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(
StreamPosition.newBuilder()
.setStream(Stream.newBuilder().setName("primary"))
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2))
.build()))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2)));
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(
StreamPosition.newBuilder()
.setStream(Stream.newBuilder().setName("residual"))
.setOffset(0))
.build()))
.thenReturn(
new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size())));
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build(),
parentStream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
// Read a few records from the parent stream and ensure that records are returned in the
// prescribed order.
BoundedReader<TableRow> parent = streamSource.createReader(options);
assertTrue(parent.start());
assertEquals("A", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("B", parent.getCurrent().get("name"));
// Now split the stream, and ensure that the "parent" reader has been replaced with the
// primary stream and that the returned source points to the residual stream.
BoundedReader<TableRow> primary = parent;
BoundedSource<TableRow> residualSource = parent.splitAtFraction(0.5);
assertNotNull(residualSource);
BoundedReader<TableRow> residual = residualSource.createReader(options);
assertTrue(primary.advance());
assertEquals("C", primary.getCurrent().get("name"));
assertFalse(primary.advance());
assertTrue(residual.start());
assertEquals("D", residual.getCurrent().get("name"));
assertTrue(residual.advance());
assertEquals("E", residual.getCurrent().get("name"));
assertFalse(residual.advance());
}
@Test
public void testStreamSourceSplitAtFractionRepeated() throws Exception {
List<Stream> streams =
Lists.newArrayList(
Stream.newBuilder().setName("stream1").build(),
Stream.newBuilder().setName("stream2").build(),
Stream.newBuilder().setName("stream3").build());
StorageClient fakeStorageClient = mock(StorageClient.class);
// Mock the initial ReadRows call.
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(streams.get(0)))
.build()))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)),
0.25),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("C", 3, AVRO_SCHEMA), createRecord("D", 4, AVRO_SCHEMA)),
0.50),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("E", 5, AVRO_SCHEMA), createRecord("F", 6, AVRO_SCHEMA)),
0.75))));
// Mock the first SplitReadStream call.
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(streams.get(0))
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.83f))
.build())
.build())
.build()))
.thenReturn(
SplitReadStreamResponse.newBuilder()
.setPrimaryStream(streams.get(1))
.setRemainderStream(Stream.newBuilder().setName("ignored"))
.build());
// Mock the second ReadRows call.
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(streams.get(1)).setOffset(1))
.build()))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("B", 2, AVRO_SCHEMA), createRecord("C", 3, AVRO_SCHEMA)),
0.50),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)),
0.75))));
// Mock the second SplitReadStream call.
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(streams.get(1))
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.75f))
.build())
.build())
.build()))
.thenReturn(
SplitReadStreamResponse.newBuilder()
.setPrimaryStream(streams.get(2))
.setRemainderStream(Stream.newBuilder().setName("ignored"))
.build());
// Mock the third ReadRows call.
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(streams.get(2)).setOffset(2))
.build()))
.thenReturn(
new FakeBigQueryServerStream<>(
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("C", 3, AVRO_SCHEMA), createRecord("D", 4, AVRO_SCHEMA)),
0.90))));
BoundedSource<TableRow> source =
BigQueryStorageStreamSource.create(
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build(),
streams.get(0),
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
BoundedReader<TableRow> reader = source.createReader(options);
assertTrue(reader.start());
assertEquals("A", reader.getCurrent().get("name"));
BoundedSource<TableRow> residualSource = reader.splitAtFraction(0.83f);
assertNotNull(residualSource);
assertEquals("A", reader.getCurrent().get("name"));
assertTrue(reader.advance());
assertEquals("B", reader.getCurrent().get("name"));
residualSource = reader.splitAtFraction(0.75f);
assertNotNull(residualSource);
assertEquals("B", reader.getCurrent().get("name"));
assertTrue(reader.advance());
assertEquals("C", reader.getCurrent().get("name"));
assertTrue(reader.advance());
assertEquals("D", reader.getCurrent().get("name"));
assertFalse(reader.advance());
}
@Test
public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws Exception {
Stream parentStream = Stream.newBuilder().setName("parent").build();
List<ReadRowsResponse> parentResponses =
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)),
0.25),
createResponse(
AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.50),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)),
0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(parentStream))
.build()))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
// that the split is not possible.
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(parentStream)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.5f))
.build())
.build())
.build()))
.thenReturn(SplitReadStreamResponse.getDefaultInstance());
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build(),
parentStream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
// Read a few records from the parent stream and ensure that records are returned in the
// prescribed order.
BoundedReader<TableRow> parent = streamSource.createReader(options);
assertTrue(parent.start());
assertEquals("A", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("B", parent.getCurrent().get("name"));
assertNull(parent.splitAtFraction(0.5));
// Verify that the parent source still works okay even after an unsuccessful split attempt.
assertTrue(parent.advance());
assertEquals("C", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("D", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("E", parent.getCurrent().get("name"));
assertFalse(parent.advance());
}
@Test
public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() throws Exception {
Stream parentStream = Stream.newBuilder().setName("parent").build();
List<ReadRowsResponse> parentResponses =
Lists.newArrayList(
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)),
0.25),
createResponse(
AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.50),
createResponse(
AVRO_SCHEMA,
Lists.newArrayList(
createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)),
0.75));
StorageClient fakeStorageClient = mock(StorageClient.class);
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(StreamPosition.newBuilder().setStream(parentStream))
.build()))
.thenReturn(new FakeBigQueryServerStream<>(parentResponses));
// Mocks the split call. A response without a primary_stream and remainder_stream means
// that the split is not possible.
// Mocks the split call.
when(fakeStorageClient.splitReadStream(
SplitReadStreamRequest.newBuilder()
.setOriginalStream(parentStream)
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setFraction().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(
2,
UnknownFieldSet.Field.newBuilder()
.addFixed32(java.lang.Float.floatToIntBits(0.5f))
.build())
.build())
.build()))
.thenReturn(
SplitReadStreamResponse.newBuilder()
.setPrimaryStream(Stream.newBuilder().setName("primary"))
.setRemainderStream(Stream.newBuilder().setName("residual"))
.build());
// Mocks the ReadRows calls expected on the primary and residual streams.
when(fakeStorageClient.readRows(
ReadRowsRequest.newBuilder()
.setReadPosition(
StreamPosition.newBuilder()
.setStream(Stream.newBuilder().setName("primary"))
// This test will read rows 0 and 1 from the parent before calling split,
// so we expect the primary read to start at offset 2.
.setOffset(2))
.build()))
.thenThrow(
new FailedPreconditionException(
"Given row offset is invalid for stream.",
new StatusRuntimeException(Status.FAILED_PRECONDITION),
GrpcStatusCode.of(Code.FAILED_PRECONDITION),
/* retryable = */ false));
BigQueryStorageStreamSource<TableRow> streamSource =
BigQueryStorageStreamSource.create(
ReadSession.newBuilder()
.setName("readSession")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.build(),
parentStream,
TABLE_SCHEMA,
new TableRowParser(),
TableRowJsonCoder.of(),
new FakeBigQueryServices().withStorageClient(fakeStorageClient));
// Read a few records from the parent stream and ensure that records are returned in the
// prescribed order.
BoundedReader<TableRow> parent = streamSource.createReader(options);
assertTrue(parent.start());
assertEquals("A", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("B", parent.getCurrent().get("name"));
assertNull(parent.splitAtFraction(0.5));
// Verify that the parent source still works okay even after an unsuccessful split attempt.
assertTrue(parent.advance());
assertEquals("C", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("D", parent.getCurrent().get("name"));
assertTrue(parent.advance());
assertEquals("E", parent.getCurrent().get("name"));
assertFalse(parent.advance());
}
private static final class ParseKeyValue
implements SerializableFunction<SchemaAndRecord, KV<String, Long>> {
@Override
public KV<String, Long> apply(SchemaAndRecord input) {
return KV.of(
input.getRecord().get("name").toString(), (Long) input.getRecord().get("number"));
}
}
@Test
public void testReadFromBigQueryIO() throws Exception {
fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null);
TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table");
Table table =
new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(new TableSchema());
fakeDatasetService.createTable(table);
CreateReadSessionRequest expectedCreateReadSessionRequest =
CreateReadSessionRequest.newBuilder()
.setParent("projects/project-id")
.setTableReference(BigQueryHelpers.toTableRefProto(tableRef))
.setRequestedStreams(10)
.setReadOptions(
TableReadOptions.newBuilder().addSelectedFields("name").addSelectedFields("number"))
// TODO(aryann): Once we rebuild the generated client code, we should change this to
// use setShardingStrategy().
.setUnknownFields(
UnknownFieldSet.newBuilder()
.addField(7, UnknownFieldSet.Field.newBuilder().addVarint(2).build())
.build())
.build();
ReadSession readSession =
ReadSession.newBuilder()
.setName("readSessionName")
.setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING))
.addStreams(Stream.newBuilder().setName("streamName"))
.build();
ReadRowsRequest expectedReadRowsRequest =
ReadRowsRequest.newBuilder()
.setReadPosition(
StreamPosition.newBuilder().setStream(Stream.newBuilder().setName("streamName")))
.build();
List<GenericRecord> records =
Lists.newArrayList(
createRecord("A", 1, AVRO_SCHEMA),
createRecord("B", 2, AVRO_SCHEMA),
createRecord("C", 3, AVRO_SCHEMA),
createRecord("D", 4, AVRO_SCHEMA));
List<ReadRowsResponse> readRowsResponses =
Lists.newArrayList(
createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.50),
createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.75));
StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable());
when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest))
.thenReturn(readSession);
when(fakeStorageClient.readRows(expectedReadRowsRequest))
.thenReturn(new FakeBigQueryServerStream<>(readRowsResponses));
PCollection<KV<String, Long>> output =
p.apply(
BigQueryIO.read(new ParseKeyValue())
.from("foo.com:project:dataset.table")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(p.newProvider(Lists.newArrayList("name", "number")))
.withTestServices(
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withStorageClient(fakeStorageClient)));
PAssert.that(output)
.containsInAnyOrder(
ImmutableList.of(KV.of("A", 1L), KV.of("B", 2L), KV.of("C", 3L), KV.of("D", 4L)));
p.run();
}
}