| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.gcp.bigquery; |
| |
| import static java.util.Arrays.asList; |
| import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; |
| import static org.hamcrest.MatcherAssert.assertThat; |
| import static org.hamcrest.Matchers.containsInAnyOrder; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.mockito.Mockito.mock; |
| import static org.mockito.Mockito.times; |
| import static org.mockito.Mockito.verify; |
| import static org.mockito.Mockito.when; |
| import static org.mockito.Mockito.withSettings; |
| |
| import com.google.api.gax.grpc.GrpcStatusCode; |
| import com.google.api.gax.rpc.FailedPreconditionException; |
| import com.google.api.services.bigquery.model.Streamingbuffer; |
| import com.google.api.services.bigquery.model.Table; |
| import com.google.api.services.bigquery.model.TableFieldSchema; |
| import com.google.api.services.bigquery.model.TableReference; |
| import com.google.api.services.bigquery.model.TableRow; |
| import com.google.api.services.bigquery.model.TableSchema; |
| import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch; |
| import com.google.cloud.bigquery.storage.v1.ArrowSchema; |
| import com.google.cloud.bigquery.storage.v1.AvroRows; |
| import com.google.cloud.bigquery.storage.v1.AvroSchema; |
| import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest; |
| import com.google.cloud.bigquery.storage.v1.DataFormat; |
| import com.google.cloud.bigquery.storage.v1.ReadRowsRequest; |
| import com.google.cloud.bigquery.storage.v1.ReadRowsResponse; |
| import com.google.cloud.bigquery.storage.v1.ReadSession; |
| import com.google.cloud.bigquery.storage.v1.ReadStream; |
| import com.google.cloud.bigquery.storage.v1.SplitReadStreamRequest; |
| import com.google.cloud.bigquery.storage.v1.SplitReadStreamResponse; |
| import com.google.cloud.bigquery.storage.v1.StreamStats; |
| import com.google.cloud.bigquery.storage.v1.StreamStats.Progress; |
| import com.google.protobuf.ByteString; |
| import io.grpc.Status; |
| import io.grpc.Status.Code; |
| import io.grpc.StatusRuntimeException; |
| import java.io.ByteArrayOutputStream; |
| import java.io.IOException; |
| import java.math.BigInteger; |
| import java.nio.channels.Channels; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.List; |
| import org.apache.arrow.memory.BufferAllocator; |
| import org.apache.arrow.memory.RootAllocator; |
| import org.apache.arrow.vector.BigIntVector; |
| import org.apache.arrow.vector.VarCharVector; |
| import org.apache.arrow.vector.VectorSchemaRoot; |
| import org.apache.arrow.vector.VectorUnloader; |
| import org.apache.arrow.vector.ipc.WriteChannel; |
| import org.apache.arrow.vector.ipc.message.MessageSerializer; |
| import org.apache.arrow.vector.types.pojo.ArrowType; |
| import org.apache.arrow.vector.util.Text; |
| import org.apache.avro.Schema; |
| import org.apache.avro.generic.GenericData.Record; |
| import org.apache.avro.generic.GenericDatumWriter; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.io.Encoder; |
| import org.apache.avro.io.EncoderFactory; |
| import org.apache.beam.sdk.coders.CoderRegistry; |
| import org.apache.beam.sdk.coders.KvCoder; |
| import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder; |
| import org.apache.beam.sdk.extensions.protobuf.ProtoCoder; |
| import org.apache.beam.sdk.io.BoundedSource; |
| import org.apache.beam.sdk.io.BoundedSource.BoundedReader; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method; |
| import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices.StorageClient; |
| import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices; |
| import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices.FakeBigQueryServerStream; |
| import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService; |
| import org.apache.beam.sdk.options.PipelineOptions; |
| import org.apache.beam.sdk.options.ValueProvider; |
| import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.display.DisplayData; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists; |
| import org.junit.After; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.ExpectedException; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.rules.TestRule; |
| import org.junit.runner.Description; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| import org.junit.runners.model.Statement; |
| import org.mockito.ArgumentMatchers; |
| |
| /** Tests for {@link BigQueryIO#readTableRows() using {@link Method#DIRECT_READ}}. */ |
| @RunWith(JUnit4.class) |
| public class BigQueryIOStorageReadTest { |
| |
| private transient PipelineOptions options; |
| private final transient TemporaryFolder testFolder = new TemporaryFolder(); |
| private transient TestPipeline p; |
| private BufferAllocator allocator; |
| |
| @Rule |
| public final transient TestRule folderThenPipeline = |
| new TestRule() { |
| @Override |
| public Statement apply(Statement base, Description description) { |
| // We need to set up the temporary folder, and then set up the TestPipeline based on the |
| // chosen folder. Unfortunately, since rule evaluation order is unspecified and unrelated |
| // to field order, and is separate from construction, that requires manually creating this |
| // TestRule. |
| Statement withPipeline = |
| new Statement() { |
| @Override |
| public void evaluate() throws Throwable { |
| options = TestPipeline.testingPipelineOptions(); |
| options.as(BigQueryOptions.class).setProject("project-id"); |
| if (description.getAnnotations().stream() |
| .anyMatch(a -> a.annotationType().equals(ProjectOverride.class))) { |
| options.as(BigQueryOptions.class).setBigQueryProject("bigquery-project-id"); |
| } |
| options |
| .as(BigQueryOptions.class) |
| .setTempLocation(testFolder.getRoot().getAbsolutePath()); |
| p = TestPipeline.fromOptions(options); |
| p.apply(base, description).evaluate(); |
| } |
| }; |
| return testFolder.apply(withPipeline, description); |
| } |
| }; |
| |
| @Rule public transient ExpectedException thrown = ExpectedException.none(); |
| |
| private final FakeDatasetService fakeDatasetService = new FakeDatasetService(); |
| |
| @Before |
| public void setUp() throws Exception { |
| FakeDatasetService.setUp(); |
| allocator = new RootAllocator(Long.MAX_VALUE); |
| } |
| |
| @After |
| public void teardown() { |
| allocator.close(); |
| } |
| |
| @Test |
| public void testBuildTableBasedSource() { |
| BigQueryIO.TypedRead<TableRow> typedRead = |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("foo.com:project:dataset.table"); |
| checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); |
| assertTrue(typedRead.getValidate()); |
| } |
| |
| @Test |
| public void testBuildTableBasedSourceWithoutValidation() { |
| BigQueryIO.TypedRead<TableRow> typedRead = |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("foo.com:project:dataset.table") |
| .withoutValidation(); |
| checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); |
| assertFalse(typedRead.getValidate()); |
| } |
| |
| @Test |
| public void testBuildTableBasedSourceWithDefaultProject() { |
| BigQueryIO.TypedRead<TableRow> typedRead = |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("myDataset.myTable"); |
| checkTypedReadTableObject(typedRead, null, "myDataset", "myTable"); |
| } |
| |
| @Test |
| public void testBuildTableBasedSourceWithTableReference() { |
| TableReference tableReference = |
| new TableReference() |
| .setProjectId("foo.com:project") |
| .setDatasetId("dataset") |
| .setTableId("table"); |
| BigQueryIO.TypedRead<TableRow> typedRead = |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from(tableReference); |
| checkTypedReadTableObject(typedRead, "foo.com:project", "dataset", "table"); |
| } |
| |
| private void checkTypedReadTableObject( |
| TypedRead<?> typedRead, String project, String dataset, String table) { |
| assertEquals(project, typedRead.getTable().getProjectId()); |
| assertEquals(dataset, typedRead.getTable().getDatasetId()); |
| assertEquals(table, typedRead.getTable().getTableId()); |
| assertNull(typedRead.getQuery()); |
| assertEquals(Method.DIRECT_READ, typedRead.getMethod()); |
| } |
| |
| @Test |
| public void testBuildSourceWithTableAndFlatten() { |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage( |
| "Invalid BigQueryIO.Read: Specifies a table with a result flattening preference," |
| + " which only applies to queries"); |
| p.apply( |
| "ReadMyTable", |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("foo.com:project:dataset.table") |
| .withoutResultFlattening()); |
| p.run(); |
| } |
| |
| @Test |
| public void testBuildSourceWithTableAndSqlDialect() { |
| thrown.expect(IllegalArgumentException.class); |
| thrown.expectMessage( |
| "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference," |
| + " which only applies to queries"); |
| p.apply( |
| "ReadMyTable", |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("foo.com:project:dataset.table") |
| .usingStandardSql()); |
| p.run(); |
| } |
| |
| @Test |
| public void testDisplayData() { |
| String tableSpec = "foo.com:project:dataset.table"; |
| BigQueryIO.TypedRead<TableRow> typedRead = |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from(tableSpec); |
| DisplayData displayData = DisplayData.from(typedRead); |
| assertThat(displayData, hasDisplayItem("table", tableSpec)); |
| } |
| |
| @Test |
| public void testName() { |
| assertEquals( |
| "BigQueryIO.TypedRead", |
| BigQueryIO.read(new TableRowParser()) |
| .withCoder(TableRowJsonCoder.of()) |
| .withMethod(Method.DIRECT_READ) |
| .from("foo.com:project:dataset.table") |
| .getName()); |
| } |
| |
| @Test |
| public void testCoderInference() { |
| // Lambdas erase too much type information -- use an anonymous class here. |
| SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>> parseFn = |
| new SerializableFunction<SchemaAndRecord, KV<ByteString, ReadSession>>() { |
| @Override |
| public KV<ByteString, ReadSession> apply(SchemaAndRecord input) { |
| return null; |
| } |
| }; |
| |
| assertEquals( |
| KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(ReadSession.class)), |
| BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault())); |
| } |
| |
| @Test |
| public void testTableSourceEstimatedSize() throws Exception { |
| doTableSourceEstimatedSizeTest(false); |
| } |
| |
| @Test |
| public void testTableSourceEstimatedSize_IgnoresStreamingBuffer() throws Exception { |
| doTableSourceEstimatedSizeTest(true); |
| } |
| |
| private void doTableSourceEstimatedSizeTest(boolean useStreamingBuffer) throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(100L); |
| if (useStreamingBuffer) { |
| table.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.TEN)); |
| } |
| |
| fakeDatasetService.createTable(table); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(tableRef), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withDatasetService(fakeDatasetService)); |
| |
| assertEquals(100, tableSource.getEstimatedSizeBytes(options)); |
| } |
| |
| @Test |
| @ProjectOverride |
| public void testTableSourceEstimatedSize_WithBigQueryProject() throws Exception { |
| fakeDatasetService.createDataset("bigquery-project-id", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("bigquery-project-id:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(100L); |
| fakeDatasetService.createTable(table); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withDatasetService(fakeDatasetService)); |
| |
| assertEquals(100, tableSource.getEstimatedSizeBytes(options)); |
| } |
| |
| @Test |
| public void testTableSourceEstimatedSize_WithDefaultProject() throws Exception { |
| fakeDatasetService.createDataset("project-id", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(100L); |
| fakeDatasetService.createTable(table); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withDatasetService(fakeDatasetService)); |
| |
| assertEquals(100, tableSource.getEstimatedSizeBytes(options)); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit() throws Exception { |
| doTableSourceInitialSplitTest(1024L, 1024); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit_MinSplitCount() throws Exception { |
| doTableSourceInitialSplitTest(1024L * 1024L, 10); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit_MaxSplitCount() throws Exception { |
| doTableSourceInitialSplitTest(10L, 10_000); |
| } |
| |
| private static final String AVRO_SCHEMA_STRING = |
| "{\"namespace\": \"example.avro\",\n" |
| + " \"type\": \"record\",\n" |
| + " \"name\": \"RowRecord\",\n" |
| + " \"fields\": [\n" |
| + " {\"name\": \"name\", \"type\": \"string\"},\n" |
| + " {\"name\": \"number\", \"type\": \"long\"}\n" |
| + " ]\n" |
| + "}"; |
| |
| private static final Schema AVRO_SCHEMA = new Schema.Parser().parse(AVRO_SCHEMA_STRING); |
| |
| private static final String TRIMMED_AVRO_SCHEMA_STRING = |
| "{\"namespace\": \"example.avro\",\n" |
| + "\"type\": \"record\",\n" |
| + "\"name\": \"RowRecord\",\n" |
| + "\"fields\": [\n" |
| + " {\"name\": \"name\", \"type\": \"string\"}\n" |
| + " ]\n" |
| + "}"; |
| |
| private static final Schema TRIMMED_AVRO_SCHEMA = |
| new Schema.Parser().parse(TRIMMED_AVRO_SCHEMA_STRING); |
| |
| private static final TableSchema TABLE_SCHEMA = |
| new TableSchema() |
| .setFields( |
| ImmutableList.of( |
| new TableFieldSchema().setName("name").setType("STRING").setMode("REQUIRED"), |
| new TableFieldSchema().setName("number").setType("INTEGER").setMode("REQUIRED"))); |
| |
| private static final org.apache.arrow.vector.types.pojo.Schema ARROW_SCHEMA = |
| new org.apache.arrow.vector.types.pojo.Schema( |
| asList( |
| field("name", new ArrowType.Utf8()), field("number", new ArrowType.Int(64, true)))); |
| |
| private void doTableSourceInitialSplitTest(long bundleSize, int streamCount) throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| |
| Table table = |
| new Table().setTableReference(tableRef).setNumBytes(1024L * 1024L).setSchema(TABLE_SCHEMA); |
| |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table")) |
| .setMaxStreamCount(streamCount) |
| .build(); |
| |
| ReadSession.Builder builder = |
| ReadSession.newBuilder() |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .setDataFormat(DataFormat.AVRO); |
| for (int i = 0; i < streamCount; i++) { |
| builder.addStreams(ReadStream.newBuilder().setName("stream-" + i)); |
| } |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(tableRef), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient)); |
| |
| List<? extends BoundedSource<TableRow>> sources = tableSource.split(bundleSize, options); |
| assertEquals(streamCount, sources.size()); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit_WithSelectedFieldsAndRowRestriction() throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| |
| Table table = new Table().setTableReference(tableRef).setNumBytes(100L).setSchema(TABLE_SCHEMA); |
| |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table") |
| .setReadOptions( |
| ReadSession.TableReadOptions.newBuilder() |
| .addSelectedFields("name") |
| .setRowRestriction("number > 5"))) |
| .setMaxStreamCount(10) |
| .build(); |
| |
| ReadSession.Builder builder = |
| ReadSession.newBuilder() |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING)) |
| .setDataFormat(DataFormat.AVRO); |
| for (int i = 0; i < 10; i++) { |
| builder.addStreams(ReadStream.newBuilder().setName("stream-" + i)); |
| } |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(tableRef), |
| StaticValueProvider.of(Lists.newArrayList("name")), |
| StaticValueProvider.of("number > 5"), |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient)); |
| |
| List<? extends BoundedSource<TableRow>> sources = tableSource.split(10L, options); |
| assertEquals(10L, sources.size()); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit_WithDefaultProject() throws Exception { |
| fakeDatasetService.createDataset("project-id", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("project-id:dataset.table"); |
| |
| Table table = |
| new Table().setTableReference(tableRef).setNumBytes(1024L * 1024L).setSchema(TABLE_SCHEMA); |
| |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/project-id/datasets/dataset/tables/table")) |
| .setMaxStreamCount(1024) |
| .build(); |
| |
| ReadSession.Builder builder = |
| ReadSession.newBuilder() |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .setDataFormat(DataFormat.AVRO); |
| for (int i = 0; i < 50; i++) { |
| builder.addStreams(ReadStream.newBuilder().setName("stream-" + i)); |
| } |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(builder.build()); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(BigQueryHelpers.parseTableSpec("dataset.table")), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient)); |
| |
| List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, options); |
| assertEquals(50L, sources.size()); |
| } |
| |
| @Test |
| public void testTableSourceInitialSplit_EmptyTable() throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| |
| Table table = |
| new Table() |
| .setTableReference(tableRef) |
| .setNumBytes(1024L * 1024L) |
| .setSchema(new TableSchema()); |
| |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table")) |
| .setMaxStreamCount(1024) |
| .build(); |
| |
| ReadSession emptyReadSession = ReadSession.newBuilder().build(); |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.createReadSession(expectedRequest)).thenReturn(emptyReadSession); |
| |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of(tableRef), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient)); |
| |
| List<? extends BoundedSource<TableRow>> sources = tableSource.split(1024L, options); |
| assertTrue(sources.isEmpty()); |
| } |
| |
| @Test |
| public void testTableSourceCreateReader() throws Exception { |
| BigQueryStorageTableSource<TableRow> tableSource = |
| BigQueryStorageTableSource.create( |
| ValueProvider.StaticValueProvider.of( |
| BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table")), |
| null, |
| null, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withDatasetService(fakeDatasetService)); |
| |
| thrown.expect(UnsupportedOperationException.class); |
| thrown.expectMessage("BigQuery storage source must be split before reading"); |
| tableSource.createReader(options); |
| } |
| |
| private static GenericRecord createRecord(String name, Schema schema) { |
| GenericRecord genericRecord = new Record(schema); |
| genericRecord.put("name", name); |
| return genericRecord; |
| } |
| |
| private static GenericRecord createRecord(String name, long number, Schema schema) { |
| GenericRecord genericRecord = new Record(schema); |
| genericRecord.put("name", name); |
| genericRecord.put("number", number); |
| return genericRecord; |
| } |
| |
| private static ByteString serializeArrowSchema( |
| org.apache.arrow.vector.types.pojo.Schema arrowSchema) { |
| ByteArrayOutputStream byteOutputStream = new ByteArrayOutputStream(); |
| try { |
| MessageSerializer.serialize( |
| new WriteChannel(Channels.newChannel(byteOutputStream)), arrowSchema); |
| } catch (IOException ex) { |
| throw new RuntimeException("Failed to serialize arrow schema.", ex); |
| } |
| return ByteString.copyFrom(byteOutputStream.toByteArray()); |
| } |
| |
| private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); |
| |
| private static ReadRowsResponse createResponse( |
| Schema schema, |
| Collection<GenericRecord> genericRecords, |
| double progressAtResponseStart, |
| double progressAtResponseEnd) |
| throws Exception { |
| GenericDatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema); |
| ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); |
| Encoder binaryEncoder = ENCODER_FACTORY.binaryEncoder(outputStream, null); |
| for (GenericRecord genericRecord : genericRecords) { |
| writer.write(genericRecord, binaryEncoder); |
| } |
| |
| binaryEncoder.flush(); |
| |
| return ReadRowsResponse.newBuilder() |
| .setAvroRows( |
| AvroRows.newBuilder() |
| .setSerializedBinaryRows(ByteString.copyFrom(outputStream.toByteArray())) |
| .setRowCount(genericRecords.size())) |
| .setRowCount(genericRecords.size()) |
| .setStats( |
| StreamStats.newBuilder() |
| .setProgress( |
| Progress.newBuilder() |
| .setAtResponseStart(progressAtResponseStart) |
| .setAtResponseEnd(progressAtResponseEnd))) |
| .build(); |
| } |
| |
| private ReadRowsResponse createResponseArrow( |
| org.apache.arrow.vector.types.pojo.Schema arrowSchema, |
| List<String> name, |
| List<Long> number, |
| double progressAtResponseStart, |
| double progressAtResponseEnd) { |
| ArrowRecordBatch serializedRecord; |
| try (VectorSchemaRoot schemaRoot = VectorSchemaRoot.create(arrowSchema, allocator)) { |
| schemaRoot.allocateNew(); |
| schemaRoot.setRowCount(name.size()); |
| VarCharVector strVector = (VarCharVector) schemaRoot.getFieldVectors().get(0); |
| BigIntVector bigIntVector = (BigIntVector) schemaRoot.getFieldVectors().get(1); |
| for (int i = 0; i < name.size(); i++) { |
| bigIntVector.set(i, number.get(i)); |
| strVector.set(i, new Text(name.get(i))); |
| } |
| |
| VectorUnloader unLoader = new VectorUnloader(schemaRoot); |
| try (org.apache.arrow.vector.ipc.message.ArrowRecordBatch records = |
| unLoader.getRecordBatch()) { |
| try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { |
| MessageSerializer.serialize(new WriteChannel(Channels.newChannel(os)), records); |
| serializedRecord = |
| ArrowRecordBatch.newBuilder() |
| .setRowCount(records.getLength()) |
| .setSerializedRecordBatch(ByteString.copyFrom(os.toByteArray())) |
| .build(); |
| } catch (IOException e) { |
| throw new RuntimeException("Error writing to byte array output stream", e); |
| } |
| } |
| } |
| |
| return ReadRowsResponse.newBuilder() |
| .setArrowRecordBatch(serializedRecord) |
| .setRowCount(name.size()) |
| .setStats( |
| StreamStats.newBuilder() |
| .setProgress( |
| Progress.newBuilder() |
| .setAtResponseStart(progressAtResponseStart) |
| .setAtResponseEnd(progressAtResponseEnd))) |
| .build(); |
| } |
| |
| @Test |
| public void testStreamSourceEstimatedSizeBytes() throws Exception { |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.getDefaultInstance(), |
| ReadStream.getDefaultInstance(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices()); |
| |
| assertEquals(0, streamSource.getEstimatedSizeBytes(options)); |
| } |
| |
| @Test |
| public void testStreamSourceSplit() throws Exception { |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.getDefaultInstance(), |
| ReadStream.getDefaultInstance(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices()); |
| |
| assertThat(streamSource.split(0, options), containsInAnyOrder(streamSource)); |
| } |
| |
| @Test |
| public void testReadFromStreamSource() throws Exception { |
| |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("readStream").build(); |
| |
| List<GenericRecord> records = |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), |
| createRecord("B", 2, AVRO_SCHEMA), |
| createRecord("C", 3, AVRO_SCHEMA)); |
| |
| List<ReadRowsResponse> responses = |
| Lists.newArrayList( |
| createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50), |
| createResponse(AVRO_SCHEMA, records.subList(2, 3), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(responses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("readStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| List<TableRow> rows = new ArrayList<>(); |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { |
| rows.add(reader.getCurrent()); |
| } |
| |
| System.out.println("Rows: " + rows); |
| |
| assertEquals(3, rows.size()); |
| } |
| |
| private static final double DELTA = 1e-6; |
| |
| @Test |
| public void testFractionConsumed() throws Exception { |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("readStream").build(); |
| |
| List<GenericRecord> records = |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), |
| createRecord("B", 2, AVRO_SCHEMA), |
| createRecord("C", 3, AVRO_SCHEMA), |
| createRecord("D", 4, AVRO_SCHEMA), |
| createRecord("E", 5, AVRO_SCHEMA), |
| createRecord("F", 6, AVRO_SCHEMA), |
| createRecord("G", 7, AVRO_SCHEMA)); |
| |
| List<ReadRowsResponse> responses = |
| Lists.newArrayList( |
| createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.25), |
| // Some responses may contain zero results, so we must ensure that we can are resilient |
| // to such responses. |
| createResponse(AVRO_SCHEMA, Lists.newArrayList(), 0.25, 0.25), |
| createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.3, 0.5), |
| createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.7, 1.0)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(responses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("readStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| List<TableRow> rows = new ArrayList<>(); |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| |
| // Before call to BoundedReader#start, fraction consumed must be zero. |
| assertEquals(0.0, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.start()); // Reads A. |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads B. |
| assertEquals(0.25, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads C. |
| assertEquals(0.4, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads D. |
| assertEquals(0.5, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads E. |
| assertEquals(0.8, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads F. |
| assertEquals(0.9, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads G. |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| |
| assertFalse(reader.advance()); // Reaches the end. |
| |
| // We are done with the stream, so we should report 100% consumption. |
| assertEquals(Double.valueOf(1.0), reader.getFractionConsumed()); |
| } |
| |
| @Test |
| public void testFractionConsumedWithSplit() throws Exception { |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(); |
| |
| List<GenericRecord> records = |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), |
| createRecord("B", 2, AVRO_SCHEMA), |
| createRecord("C", 3, AVRO_SCHEMA), |
| createRecord("D", 4, AVRO_SCHEMA), |
| createRecord("E", 5, AVRO_SCHEMA), |
| createRecord("F", 6, AVRO_SCHEMA), |
| createRecord("G", 7, AVRO_SCHEMA)); |
| |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.000, 0.250), |
| createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.300, 0.500), |
| createResponse(AVRO_SCHEMA, records.subList(4, 7), 0.800, 0.875)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| |
| List<ReadRowsResponse> primaryResponses = |
| Lists.newArrayList( |
| createResponse(AVRO_SCHEMA, records.subList(1, 3), 0.25, 0.75), |
| createResponse(AVRO_SCHEMA, records.subList(3, 4), 0.8, 1.0)); |
| |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(primaryResponses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| List<TableRow> rows = new ArrayList<>(); |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| |
| // Before call to BoundedReader#start, fraction consumed must be zero. |
| assertEquals(0.0, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.start()); // Reads A. |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| |
| reader.splitAtFraction(0.5); |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads B. |
| assertEquals(0.5, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads C. |
| assertEquals(0.75, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads D. |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| |
| assertFalse(reader.advance()); |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionSucceeds() throws Exception { |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)), |
| 0.0, |
| 0.25), |
| createResponse( |
| AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.25, 0.50), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)), |
| 0.50, |
| 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| |
| // Mocks the ReadRows calls expected on the primary and residual streams. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream("primaryStream") |
| // This test will read rows 0 and 1 from the parent before calling split, |
| // so we expect the primary read to start at offset 2. |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2))); |
| |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), "")) |
| .thenReturn( |
| new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size()))); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| // Now split the stream, and ensure that the "parent" reader has been replaced with the |
| // primary stream and that the returned source points to the residual stream. |
| BoundedReader<TableRow> primary = parent; |
| BoundedSource<TableRow> residualSource = parent.splitAtFraction(0.5); |
| assertNotNull(residualSource); |
| BoundedReader<TableRow> residual = residualSource.createReader(options); |
| |
| assertTrue(primary.advance()); |
| assertEquals("C", primary.getCurrent().get("name")); |
| assertFalse(primary.advance()); |
| |
| assertTrue(residual.start()); |
| assertEquals("D", residual.getCurrent().get("name")); |
| assertTrue(residual.advance()); |
| assertEquals("E", residual.getCurrent().get("name")); |
| assertFalse(residual.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionRepeated() throws Exception { |
| List<ReadStream> readStreams = |
| Lists.newArrayList( |
| ReadStream.newBuilder().setName("stream1").build(), |
| ReadStream.newBuilder().setName("stream2").build(), |
| ReadStream.newBuilder().setName("stream3").build()); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| |
| // Mock the initial ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), "")) |
| .thenReturn( |
| new FakeBigQueryServerStream<>( |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)), |
| 0.0, |
| 0.25), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("C", 3, AVRO_SCHEMA), createRecord("D", 4, AVRO_SCHEMA)), |
| 0.25, |
| 0.50), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("E", 5, AVRO_SCHEMA), createRecord("F", 6, AVRO_SCHEMA)), |
| 0.5, |
| 0.75)))); |
| |
| // Mock the first SplitReadStream call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder() |
| .setName(readStreams.get(0).getName()) |
| .setFraction(0.83f) |
| .build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(readStreams.get(1)) |
| .setRemainderStream(ReadStream.newBuilder().setName("ignored")) |
| .build()); |
| |
| // Mock the second ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream(readStreams.get(1).getName()) |
| .setOffset(1) |
| .build(), |
| "")) |
| .thenReturn( |
| new FakeBigQueryServerStream<>( |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("B", 2, AVRO_SCHEMA), createRecord("C", 3, AVRO_SCHEMA)), |
| 0.0, |
| 0.50), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)), |
| 0.5, |
| 0.75)))); |
| |
| // Mock the second SplitReadStream call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder() |
| .setName(readStreams.get(1).getName()) |
| .setFraction(0.75f) |
| .build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(readStreams.get(2)) |
| .setRemainderStream(ReadStream.newBuilder().setName("ignored")) |
| .build()); |
| |
| // Mock the third ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream(readStreams.get(2).getName()) |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenReturn( |
| new FakeBigQueryServerStream<>( |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("C", 3, AVRO_SCHEMA), createRecord("D", 4, AVRO_SCHEMA)), |
| 0.80, |
| 0.90)))); |
| |
| BoundedSource<TableRow> source = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(), |
| readStreams.get(0), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| BoundedReader<TableRow> reader = source.createReader(options); |
| assertTrue(reader.start()); |
| assertEquals("A", reader.getCurrent().get("name")); |
| |
| BoundedSource<TableRow> residualSource = reader.splitAtFraction(0.83f); |
| assertNotNull(residualSource); |
| assertEquals("A", reader.getCurrent().get("name")); |
| |
| assertTrue(reader.advance()); |
| assertEquals("B", reader.getCurrent().get("name")); |
| |
| residualSource = reader.splitAtFraction(0.75f); |
| assertNotNull(residualSource); |
| assertEquals("B", reader.getCurrent().get("name")); |
| |
| assertTrue(reader.advance()); |
| assertEquals("C", reader.getCurrent().get("name")); |
| assertTrue(reader.advance()); |
| assertEquals("D", reader.getCurrent().get("name")); |
| assertFalse(reader.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossible() throws Exception { |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)), |
| 0.0, |
| 0.25), |
| createResponse( |
| AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.25, 0.50), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)), |
| 0.5, |
| 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. A response without a primary_stream and remainder_stream means |
| // that the split is not possible. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn(SplitReadStreamResponse.getDefaultInstance()); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| assertNull(parent.splitAtFraction(0.5)); |
| verify(fakeStorageClient, times(1)).splitReadStream(ArgumentMatchers.any()); |
| |
| // Verify that subsequent splitAtFraction() calls after a failed splitAtFraction() attempt |
| // do NOT invoke SplitReadStream. |
| assertNull(parent.splitAtFraction(0.5)); |
| verify(fakeStorageClient, times(1)).splitReadStream(ArgumentMatchers.any()); |
| |
| // Verify that the parent source still works okay even after an unsuccessful split attempt. |
| assertTrue(parent.advance()); |
| assertEquals("C", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("D", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("E", parent.getCurrent().get("name")); |
| assertFalse(parent.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPoint() throws Exception { |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), createRecord("B", 2, AVRO_SCHEMA)), |
| 0.0, |
| 0.25), |
| createResponse( |
| AVRO_SCHEMA, Lists.newArrayList(createRecord("C", 3, AVRO_SCHEMA)), 0.25, 0.50), |
| createResponse( |
| AVRO_SCHEMA, |
| Lists.newArrayList( |
| createRecord("D", 4, AVRO_SCHEMA), createRecord("E", 5, AVRO_SCHEMA)), |
| 0.5, |
| 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. A response without a primary_stream and remainder_stream means |
| // that the split is not possible. |
| // Mocks the split call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| |
| // Mocks the ReadRows calls expected on the primary and residual streams. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream("primaryStream") |
| // This test will read rows 0 and 1 from the parent before calling split, |
| // so we expect the primary read to start at offset 2. |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenThrow( |
| new FailedPreconditionException( |
| "Given row offset is invalid for stream.", |
| new StatusRuntimeException(Status.FAILED_PRECONDITION), |
| GrpcStatusCode.of(Code.FAILED_PRECONDITION), |
| /* retryable = */ false)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| assertNull(parent.splitAtFraction(0.5)); |
| |
| // Verify that the parent source still works okay even after an unsuccessful split attempt. |
| assertTrue(parent.advance()); |
| assertEquals("C", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("D", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("E", parent.getCurrent().get("name")); |
| assertFalse(parent.advance()); |
| } |
| |
| private static final class ParseKeyValue |
| implements SerializableFunction<SchemaAndRecord, KV<String, Long>> { |
| |
| @Override |
| public KV<String, Long> apply(SchemaAndRecord input) { |
| return KV.of( |
| input.getRecord().get("name").toString(), (Long) input.getRecord().get("number")); |
| } |
| } |
| |
| @Test |
| public void testReadFromBigQueryIO() throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA); |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedCreateReadSessionRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table") |
| .setDataFormat(DataFormat.AVRO)) |
| .setMaxStreamCount(10) |
| .build(); |
| |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSessionName") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(AVRO_SCHEMA_STRING)) |
| .addStreams(ReadStream.newBuilder().setName("streamName")) |
| .setDataFormat(DataFormat.AVRO) |
| .build(); |
| |
| ReadRowsRequest expectedReadRowsRequest = |
| ReadRowsRequest.newBuilder().setReadStream("streamName").build(); |
| |
| List<GenericRecord> records = |
| Lists.newArrayList( |
| createRecord("A", 1, AVRO_SCHEMA), |
| createRecord("B", 2, AVRO_SCHEMA), |
| createRecord("C", 3, AVRO_SCHEMA), |
| createRecord("D", 4, AVRO_SCHEMA)); |
| |
| List<ReadRowsResponse> readRowsResponses = |
| Lists.newArrayList( |
| createResponse(AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50), |
| createResponse(AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable()); |
| when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)) |
| .thenReturn(readSession); |
| when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); |
| |
| PCollection<KV<String, Long>> output = |
| p.apply( |
| BigQueryIO.read(new ParseKeyValue()) |
| .from("foo.com:project:dataset.table") |
| .withMethod(Method.DIRECT_READ) |
| .withFormat(DataFormat.AVRO) |
| .withTestServices( |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient))); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| ImmutableList.of(KV.of("A", 1L), KV.of("B", 2L), KV.of("C", 3L), KV.of("D", 4L))); |
| |
| p.run(); |
| } |
| |
| @Test |
| public void testReadFromBigQueryIOWithTrimmedSchema() throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA); |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedCreateReadSessionRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table") |
| .setReadOptions( |
| ReadSession.TableReadOptions.newBuilder().addSelectedFields("name")) |
| .setDataFormat(DataFormat.AVRO)) |
| .setMaxStreamCount(10) |
| .build(); |
| |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSessionName") |
| .setAvroSchema(AvroSchema.newBuilder().setSchema(TRIMMED_AVRO_SCHEMA_STRING)) |
| .addStreams(ReadStream.newBuilder().setName("streamName")) |
| .setDataFormat(DataFormat.AVRO) |
| .build(); |
| |
| ReadRowsRequest expectedReadRowsRequest = |
| ReadRowsRequest.newBuilder().setReadStream("streamName").build(); |
| |
| List<GenericRecord> records = |
| Lists.newArrayList( |
| createRecord("A", TRIMMED_AVRO_SCHEMA), |
| createRecord("B", TRIMMED_AVRO_SCHEMA), |
| createRecord("C", TRIMMED_AVRO_SCHEMA), |
| createRecord("D", TRIMMED_AVRO_SCHEMA)); |
| |
| List<ReadRowsResponse> readRowsResponses = |
| Lists.newArrayList( |
| createResponse(TRIMMED_AVRO_SCHEMA, records.subList(0, 2), 0.0, 0.50), |
| createResponse(TRIMMED_AVRO_SCHEMA, records.subList(2, 4), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable()); |
| when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)) |
| .thenReturn(readSession); |
| when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); |
| |
| PCollection<TableRow> output = |
| p.apply( |
| BigQueryIO.readTableRows() |
| .from("foo.com:project:dataset.table") |
| .withMethod(Method.DIRECT_READ) |
| .withSelectedFields(Lists.newArrayList("name")) |
| .withFormat(DataFormat.AVRO) |
| .withTestServices( |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient))); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| ImmutableList.of( |
| new TableRow().set("name", "A"), |
| new TableRow().set("name", "B"), |
| new TableRow().set("name", "C"), |
| new TableRow().set("name", "D"))); |
| |
| p.run(); |
| } |
| |
| @Test |
| public void testReadFromBigQueryIOArrow() throws Exception { |
| fakeDatasetService.createDataset("foo.com:project", "dataset", "", "", null); |
| TableReference tableRef = BigQueryHelpers.parseTableSpec("foo.com:project:dataset.table"); |
| Table table = new Table().setTableReference(tableRef).setNumBytes(10L).setSchema(TABLE_SCHEMA); |
| fakeDatasetService.createTable(table); |
| |
| CreateReadSessionRequest expectedCreateReadSessionRequest = |
| CreateReadSessionRequest.newBuilder() |
| .setParent("projects/project-id") |
| .setReadSession( |
| ReadSession.newBuilder() |
| .setTable("projects/foo.com:project/datasets/dataset/tables/table") |
| .setDataFormat(DataFormat.ARROW)) |
| .setMaxStreamCount(10) |
| .build(); |
| |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSessionName") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .addStreams(ReadStream.newBuilder().setName("streamName")) |
| .setDataFormat(DataFormat.ARROW) |
| .build(); |
| |
| ReadRowsRequest expectedReadRowsRequest = |
| ReadRowsRequest.newBuilder().setReadStream("streamName").build(); |
| |
| List<String> names = Arrays.asList("A", "B", "C", "D"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L); |
| List<ReadRowsResponse> readRowsResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.50), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class, withSettings().serializable()); |
| when(fakeStorageClient.createReadSession(expectedCreateReadSessionRequest)) |
| .thenReturn(readSession); |
| when(fakeStorageClient.readRows(expectedReadRowsRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(readRowsResponses)); |
| |
| PCollection<KV<String, Long>> output = |
| p.apply( |
| BigQueryIO.read(new ParseKeyValue()) |
| .from("foo.com:project:dataset.table") |
| .withMethod(Method.DIRECT_READ) |
| .withFormat(DataFormat.ARROW) |
| .withTestServices( |
| new FakeBigQueryServices() |
| .withDatasetService(fakeDatasetService) |
| .withStorageClient(fakeStorageClient))); |
| |
| PAssert.that(output) |
| .containsInAnyOrder( |
| ImmutableList.of(KV.of("A", 1L), KV.of("B", 2L), KV.of("C", 3L), KV.of("D", 4L))); |
| |
| p.run(); |
| } |
| |
| @Test |
| public void testReadFromStreamSourceArrow() throws Exception { |
| |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("readStream").build(); |
| |
| List<String> names = Arrays.asList("A", "B", "C"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L); |
| List<ReadRowsResponse> responses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.50), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(responses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("readStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| List<TableRow> rows = new ArrayList<>(); |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| for (boolean hasNext = reader.start(); hasNext; hasNext = reader.advance()) { |
| rows.add(reader.getCurrent()); |
| } |
| |
| System.out.println("Rows: " + rows); |
| |
| assertEquals(3, rows.size()); |
| } |
| |
| @Test |
| public void testFractionConsumedArrow() throws Exception { |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("readStream").build(); |
| |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L); |
| List<ReadRowsResponse> responses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow( |
| ARROW_SCHEMA, Lists.newArrayList(), Lists.newArrayList(), 0.25, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.3, 0.5), |
| createResponseArrow(ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 1.0)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(responses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("readStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| List<TableRow> rows = new ArrayList<>(); |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| |
| // Before call to BoundedReader#start, fraction consumed must be zero. |
| assertEquals(0.0, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.start()); // Reads A. |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads B. |
| assertEquals(0.25, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads C. |
| assertEquals(0.4, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads D. |
| assertEquals(0.5, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads E. |
| assertEquals(0.8, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads F. |
| assertEquals(0.9, reader.getFractionConsumed(), DELTA); |
| assertTrue(reader.advance()); // Reads G. |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| |
| assertFalse(reader.advance()); // Reaches the end. |
| |
| // We are done with the stream, so we should report 100% consumption. |
| assertEquals(Double.valueOf(1.0), reader.getFractionConsumed()); |
| } |
| |
| @Test |
| public void testFractionConsumedWithSplitArrow() throws Exception { |
| ReadSession readSession = |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(); |
| |
| ReadRowsRequest expectedRequest = |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(); |
| |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L); |
| List<ReadRowsResponse> parentResponse = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.3, 0.5), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(4, 7), values.subList(4, 7), 0.7, 0.875)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows(expectedRequest, "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponse)); |
| |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| List<ReadRowsResponse> primaryResponses = |
| Lists.newArrayList( |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(1, 3), values.subList(1, 3), 0.25, 0.75), |
| createResponseArrow(ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.8, 1.0)); |
| |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("primaryStream").setOffset(1).build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(primaryResponses)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| readSession, |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| BoundedReader<TableRow> reader = streamSource.createReader(options); |
| |
| // Before call to BoundedReader#start, fraction consumed must be zero. |
| assertEquals(0.0, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.start()); // Reads A. |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| |
| reader.splitAtFraction(0.5); |
| assertEquals(0.125, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads B. |
| assertEquals(0.5, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads C. |
| assertEquals(0.75, reader.getFractionConsumed(), DELTA); |
| |
| assertTrue(reader.advance()); // Reads D. |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| |
| assertFalse(reader.advance()); |
| assertEquals(1.0, reader.getFractionConsumed(), DELTA); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionSucceedsArrow() throws Exception { |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F", "G"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L); |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| |
| // Mocks the ReadRows calls expected on the primary and residual streams. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream("primaryStream") |
| // This test will read rows 0 and 1 from the parent before calling split, |
| // so we expect the primary read to start at offset 2. |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses.subList(1, 2))); |
| |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("remainderStream").build(), "")) |
| .thenReturn( |
| new FakeBigQueryServerStream<>(parentResponses.subList(2, parentResponses.size()))); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| // Now split the stream, and ensure that the "parent" reader has been replaced with the |
| // primary stream and that the returned source points to the residual stream. |
| BoundedReader<TableRow> primary = parent; |
| BoundedSource<TableRow> residualSource = parent.splitAtFraction(0.5); |
| assertNotNull(residualSource); |
| BoundedReader<TableRow> residual = residualSource.createReader(options); |
| |
| assertTrue(primary.advance()); |
| assertEquals("C", primary.getCurrent().get("name")); |
| assertFalse(primary.advance()); |
| |
| assertTrue(residual.start()); |
| assertEquals("D", residual.getCurrent().get("name")); |
| assertTrue(residual.advance()); |
| assertEquals("E", residual.getCurrent().get("name")); |
| assertFalse(residual.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionRepeatedArrow() throws Exception { |
| List<ReadStream> readStreams = |
| Lists.newArrayList( |
| ReadStream.newBuilder().setName("stream1").build(), |
| ReadStream.newBuilder().setName("stream2").build(), |
| ReadStream.newBuilder().setName("stream3").build()); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L); |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.25, 0.5), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(4, 6), values.subList(4, 6), 0.5, 0.75)); |
| |
| // Mock the initial ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream(readStreams.get(0).getName()).build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mock the first SplitReadStream call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder() |
| .setName(readStreams.get(0).getName()) |
| .setFraction(0.83f) |
| .build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(readStreams.get(1)) |
| .setRemainderStream(ReadStream.newBuilder().setName("ignored")) |
| .build()); |
| |
| List<ReadRowsResponse> otherResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(1, 3), values.subList(1, 3), 0.0, 0.50), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(3, 4), values.subList(3, 4), 0.5, 0.75)); |
| |
| // Mock the second ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream(readStreams.get(1).getName()) |
| .setOffset(1) |
| .build(), |
| "")) |
| .thenReturn(new FakeBigQueryServerStream<>(otherResponses)); |
| |
| // Mock the second SplitReadStream call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder() |
| .setName(readStreams.get(1).getName()) |
| .setFraction(0.75f) |
| .build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(readStreams.get(2)) |
| .setRemainderStream(ReadStream.newBuilder().setName("ignored")) |
| .build()); |
| |
| List<ReadRowsResponse> lastResponses = |
| Lists.newArrayList( |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(2, 4), values.subList(2, 4), 0.80, 0.90)); |
| |
| // Mock the third ReadRows call. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream(readStreams.get(2).getName()) |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenReturn(new FakeBigQueryServerStream<>(lastResponses)); |
| |
| BoundedSource<TableRow> source = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(), |
| readStreams.get(0), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| BoundedReader<TableRow> reader = source.createReader(options); |
| assertTrue(reader.start()); |
| assertEquals("A", reader.getCurrent().get("name")); |
| |
| BoundedSource<TableRow> residualSource = reader.splitAtFraction(0.83f); |
| assertNotNull(residualSource); |
| assertEquals("A", reader.getCurrent().get("name")); |
| |
| assertTrue(reader.advance()); |
| assertEquals("B", reader.getCurrent().get("name")); |
| |
| residualSource = reader.splitAtFraction(0.75f); |
| assertNotNull(residualSource); |
| assertEquals("B", reader.getCurrent().get("name")); |
| |
| assertTrue(reader.advance()); |
| assertEquals("C", reader.getCurrent().get("name")); |
| assertTrue(reader.advance()); |
| assertEquals("D", reader.getCurrent().get("name")); |
| assertFalse(reader.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionFailsWhenSplitIsNotPossibleArrow() throws Exception { |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L); |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. A response without a primary_stream and remainder_stream means |
| // that the split is not possible. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn(SplitReadStreamResponse.getDefaultInstance()); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| assertNull(parent.splitAtFraction(0.5)); |
| verify(fakeStorageClient, times(1)).splitReadStream(ArgumentMatchers.any()); |
| |
| // Verify that subsequent splitAtFraction() calls after a failed splitAtFraction() attempt |
| // do NOT invoke SplitReadStream. |
| assertNull(parent.splitAtFraction(0.5)); |
| verify(fakeStorageClient, times(1)).splitReadStream(ArgumentMatchers.any()); |
| |
| // Verify that the parent source still works okay even after an unsuccessful split attempt. |
| assertTrue(parent.advance()); |
| assertEquals("C", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("D", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("E", parent.getCurrent().get("name")); |
| assertFalse(parent.advance()); |
| } |
| |
| @Test |
| public void testStreamSourceSplitAtFractionFailsWhenParentIsPastSplitPointArrow() |
| throws Exception { |
| List<String> names = Arrays.asList("A", "B", "C", "D", "E", "F"); |
| List<Long> values = Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L); |
| List<ReadRowsResponse> parentResponses = |
| Lists.newArrayList( |
| createResponseArrow(ARROW_SCHEMA, names.subList(0, 2), values.subList(0, 2), 0.0, 0.25), |
| createResponseArrow(ARROW_SCHEMA, names.subList(2, 3), values.subList(2, 3), 0.25, 0.5), |
| createResponseArrow( |
| ARROW_SCHEMA, names.subList(3, 5), values.subList(3, 5), 0.5, 0.75)); |
| |
| StorageClient fakeStorageClient = mock(StorageClient.class); |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder().setReadStream("parentStream").build(), "")) |
| .thenReturn(new FakeBigQueryServerStream<>(parentResponses)); |
| |
| // Mocks the split call. A response without a primary_stream and remainder_stream means |
| // that the split is not possible. |
| // Mocks the split call. |
| when(fakeStorageClient.splitReadStream( |
| SplitReadStreamRequest.newBuilder().setName("parentStream").setFraction(0.5f).build())) |
| .thenReturn( |
| SplitReadStreamResponse.newBuilder() |
| .setPrimaryStream(ReadStream.newBuilder().setName("primaryStream")) |
| .setRemainderStream(ReadStream.newBuilder().setName("remainderStream")) |
| .build()); |
| |
| // Mocks the ReadRows calls expected on the primary and residual streams. |
| when(fakeStorageClient.readRows( |
| ReadRowsRequest.newBuilder() |
| .setReadStream("primaryStream") |
| // This test will read rows 0 and 1 from the parent before calling split, |
| // so we expect the primary read to start at offset 2. |
| .setOffset(2) |
| .build(), |
| "")) |
| .thenThrow( |
| new FailedPreconditionException( |
| "Given row offset is invalid for stream.", |
| new StatusRuntimeException(Status.FAILED_PRECONDITION), |
| GrpcStatusCode.of(Code.FAILED_PRECONDITION), |
| /* retryable = */ false)); |
| |
| BigQueryStorageStreamSource<TableRow> streamSource = |
| BigQueryStorageStreamSource.create( |
| ReadSession.newBuilder() |
| .setName("readSession") |
| .setArrowSchema( |
| ArrowSchema.newBuilder() |
| .setSerializedSchema(serializeArrowSchema(ARROW_SCHEMA)) |
| .build()) |
| .setDataFormat(DataFormat.ARROW) |
| .build(), |
| ReadStream.newBuilder().setName("parentStream").build(), |
| TABLE_SCHEMA, |
| new TableRowParser(), |
| TableRowJsonCoder.of(), |
| new FakeBigQueryServices().withStorageClient(fakeStorageClient)); |
| |
| // Read a few records from the parent stream and ensure that records are returned in the |
| // prescribed order. |
| BoundedReader<TableRow> parent = streamSource.createReader(options); |
| assertTrue(parent.start()); |
| assertEquals("A", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("B", parent.getCurrent().get("name")); |
| |
| assertNull(parent.splitAtFraction(0.5)); |
| |
| // Verify that the parent source still works okay even after an unsuccessful split attempt. |
| assertTrue(parent.advance()); |
| assertEquals("C", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("D", parent.getCurrent().get("name")); |
| assertTrue(parent.advance()); |
| assertEquals("E", parent.getCurrent().get("name")); |
| assertFalse(parent.advance()); |
| } |
| |
| private static org.apache.arrow.vector.types.pojo.Field field( |
| String name, |
| boolean nullable, |
| ArrowType type, |
| org.apache.arrow.vector.types.pojo.Field... children) { |
| return new org.apache.arrow.vector.types.pojo.Field( |
| name, |
| new org.apache.arrow.vector.types.pojo.FieldType(nullable, type, null, null), |
| asList(children)); |
| } |
| |
| static org.apache.arrow.vector.types.pojo.Field field( |
| String name, ArrowType type, org.apache.arrow.vector.types.pojo.Field... children) { |
| return field(name, false, type, children); |
| } |
| } |