blob: 3edd6e3e67588a4a8afd8bda165d7fce2f441366 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.bigquery;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createJobIdToken;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers.createTempTableReference;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThat;
import com.google.api.services.bigquery.model.JobStatistics;
import com.google.api.services.bigquery.model.JobStatistics2;
import com.google.api.services.bigquery.model.Streamingbuffer;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.bigtable.v2.Mutation;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.io.Serializable;
import java.math.BigInteger;
import java.util.List;
import java.util.Set;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.protobuf.ByteStringCoder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.QueryPriority;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.model.Statement;
/** Tests for {@link BigQueryIO#read}. */
@RunWith(JUnit4.class)
public class BigQueryIOReadTest implements Serializable {
private transient PipelineOptions options;
private transient TemporaryFolder testFolder = new TemporaryFolder();
private transient TestPipeline p;
@Rule
public final transient TestRule folderThenPipeline =
new TestRule() {
@Override
public Statement apply(final Statement base, final Description description) {
// We need to set up the temporary folder, and then set up the TestPipeline based on the
// chosen folder. Unfortunately, since rule evaluation order is unspecified and unrelated
// to field order, and is separate from construction, that requires manually creating this
// TestRule.
Statement withPipeline =
new Statement() {
@Override
public void evaluate() throws Throwable {
options = TestPipeline.testingPipelineOptions();
options.as(BigQueryOptions.class).setProject("project-id");
options
.as(BigQueryOptions.class)
.setTempLocation(testFolder.getRoot().getAbsolutePath());
p = TestPipeline.fromOptions(options);
p.apply(base, description).evaluate();
}
};
return testFolder.apply(withPipeline, description);
}
};
@Rule public transient ExpectedException thrown = ExpectedException.none();
private FakeDatasetService fakeDatasetService = new FakeDatasetService();
private FakeJobService fakeJobService = new FakeJobService();
private FakeBigQueryServices fakeBqServices =
new FakeBigQueryServices()
.withDatasetService(fakeDatasetService)
.withJobService(fakeJobService);
private void checkReadTableObject(
BigQueryIO.Read read, String project, String dataset, String table) {
checkReadTableObjectWithValidate(read, project, dataset, table, true);
}
private void checkReadQueryObject(BigQueryIO.Read read, String query) {
checkReadQueryObjectWithValidate(read, query, true);
}
private void checkTypedReadQueryObject(BigQueryIO.TypedRead read, String query, String kmsKey) {
checkTypedReadQueryObjectWithValidate(read, query, kmsKey, true);
}
private void checkReadTableObjectWithValidate(
BigQueryIO.Read read, String project, String dataset, String table, boolean validate) {
assertEquals(project, read.getTable().getProjectId());
assertEquals(dataset, read.getTable().getDatasetId());
assertEquals(table, read.getTable().getTableId());
assertNull(read.getQuery());
assertEquals(validate, read.getValidate());
}
private void checkReadQueryObjectWithValidate(
BigQueryIO.Read read, String query, boolean validate) {
assertNull(read.getTable());
assertEquals(query, read.getQuery().get());
assertEquals(validate, read.getValidate());
}
private void checkTypedReadQueryObjectWithValidate(
BigQueryIO.TypedRead read, String query, String kmsKey, boolean validate) {
assertNull(read.getTable());
assertEquals(query, read.getQuery().get());
assertEquals(kmsKey, read.getKmsKey());
assertEquals(validate, read.getValidate());
}
@Before
public void setUp() throws IOException, InterruptedException {
FakeDatasetService.setUp();
BigQueryIO.clearCreatedTables();
fakeDatasetService.createDataset("project-id", "dataset-id", "", "", null);
}
@Test
public void testBuildTableBasedSource() {
BigQueryIO.Read read = BigQueryIO.read().from("foo.com:project:somedataset.sometable");
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedSource() {
BigQueryIO.Read read = BigQueryIO.read().fromQuery("foo_query");
checkReadQueryObject(read, "foo_query");
}
@Test
public void testBuildTableBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read =
BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutValidation();
checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", "sometable", false);
}
@Test
public void testBuildQueryBasedSourceWithoutValidation() {
// This test just checks that using withoutValidation will not trigger object
// construction errors.
BigQueryIO.Read read = BigQueryIO.read().fromQuery("some_query").withoutValidation();
checkReadQueryObjectWithValidate(read, "some_query", false);
}
@Test
public void testBuildTableBasedSourceWithDefaultProject() {
BigQueryIO.Read read = BigQueryIO.read().from("somedataset.sometable");
checkReadTableObject(read, null, "somedataset", "sometable");
}
@Test
public void testBuildSourceWithTableReference() {
TableReference table =
new TableReference()
.setProjectId("foo.com:project")
.setDatasetId("somedataset")
.setTableId("sometable");
BigQueryIO.Read read = BigQueryIO.read().from(table);
checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
}
@Test
public void testBuildQueryBasedTypedReadSource() {
BigQueryIO.TypedRead read =
BigQueryIO.readTableRows().fromQuery("foo_query").withKmsKey("kms_key");
checkTypedReadQueryObject(read, "foo_query", "kms_key");
}
@Test
public void testValidateReadSetsDefaultProject() throws Exception {
String tableId = "sometable";
TableReference tableReference =
new TableReference()
.setProjectId("project-id")
.setDatasetId("dataset-id")
.setTableId(tableId);
fakeDatasetService.createTable(
new Table()
.setTableReference(tableReference)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER")))));
FakeBigQueryServices fakeBqServices =
new FakeBigQueryServices()
.withJobService(new FakeJobService())
.withDatasetService(fakeDatasetService);
List<TableRow> expected =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
fakeDatasetService.insertAll(tableReference, expected, null);
TableReference tableRef = new TableReference().setDatasetId("dataset-id").setTableId(tableId);
PCollection<KV<String, Long>> output =
p.apply(BigQueryIO.read().from(tableRef).withTestServices(fakeBqServices))
.apply(
ParDo.of(
new DoFn<TableRow, KV<String, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(
KV.of(
(String) c.element().get("name"),
Long.valueOf((String) c.element().get("number"))));
}
}));
PAssert.that(output)
.containsInAnyOrder(
ImmutableList.of(
KV.of("a", 1L),
KV.of("b", 2L),
KV.of("c", 3L),
KV.of("d", 4L),
KV.of("e", 5L),
KV.of("f", 6L)));
p.run();
}
@Test
public void testBuildSourceWithTableAndFlatten() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply(
"ReadMyTable",
BigQueryIO.read().from("foo.com:project:somedataset.sometable").withoutResultFlattening());
p.run();
}
@Test
public void testBuildSourceWithTableAndFlattenWithoutValidation() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid BigQueryIO.Read: Specifies a table with a result flattening preference,"
+ " which only applies to queries");
p.apply(
BigQueryIO.read()
.from("foo.com:project:somedataset.sometable")
.withoutValidation()
.withoutResultFlattening());
p.run();
}
@Test
public void testBuildSourceWithTableAndSqlDialect() {
thrown.expect(IllegalArgumentException.class);
thrown.expectMessage(
"Invalid BigQueryIO.Read: Specifies a table with a SQL dialect preference,"
+ " which only applies to queries");
p.apply(BigQueryIO.read().from("foo.com:project:somedataset.sometable").usingStandardSql());
p.run();
}
@Test
public void testReadFromTableWithoutTemplateCompatibility()
throws IOException, InterruptedException {
testReadFromTable(false, false);
}
@Test
public void testReadFromTableWithTemplateCompatibility()
throws IOException, InterruptedException {
testReadFromTable(true, false);
}
@Test
public void testReadTableRowsFromTableWithoutTemplateCompatibility()
throws IOException, InterruptedException {
testReadFromTable(false, true);
}
@Test
public void testReadTableRowsFromTableWithTemplateCompatibility()
throws IOException, InterruptedException {
testReadFromTable(true, true);
}
private void testReadFromTable(boolean useTemplateCompatibility, boolean useReadTableRows)
throws IOException, InterruptedException {
Table sometable = new Table();
sometable.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
sometable.setTableReference(
new TableReference()
.setProjectId("non-executing-project")
.setDatasetId("somedataset")
.setTableId("sometable"));
sometable.setNumBytes(1024L * 1024L);
FakeDatasetService fakeDatasetService = new FakeDatasetService();
fakeDatasetService.createDataset("non-executing-project", "somedataset", "", "", null);
fakeDatasetService.createTable(sometable);
List<TableRow> records =
Lists.newArrayList(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L));
fakeDatasetService.insertAll(sometable.getTableReference(), records, null);
FakeBigQueryServices fakeBqServices =
new FakeBigQueryServices()
.withJobService(new FakeJobService())
.withDatasetService(fakeDatasetService);
PTransform<PBegin, PCollection<TableRow>> readTransform;
if (useReadTableRows) {
BigQueryIO.Read read =
BigQueryIO.read()
.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation();
readTransform = useTemplateCompatibility ? read.withTemplateCompatibility() : read;
} else {
BigQueryIO.TypedRead<TableRow> read =
BigQueryIO.readTableRows()
.from("non-executing-project:somedataset.sometable")
.withTestServices(fakeBqServices)
.withoutValidation();
readTransform = useTemplateCompatibility ? read.withTemplateCompatibility() : read;
}
PCollection<KV<String, Long>> output =
p.apply(readTransform)
.apply(
ParDo.of(
new DoFn<TableRow, KV<String, Long>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
c.output(
KV.of(
(String) c.element().get("name"),
Long.valueOf((String) c.element().get("number"))));
}
}));
PAssert.that(output)
.containsInAnyOrder(ImmutableList.of(KV.of("a", 1L), KV.of("b", 2L), KV.of("c", 3L)));
p.run();
}
@Test
public void testReadTableWithSchema() throws IOException, InterruptedException {
// setup
Table someTable = new Table();
someTable.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
someTable.setTableReference(
new TableReference()
.setProjectId("non-executing-project")
.setDatasetId("schema_dataset")
.setTableId("schema_table"));
someTable.setNumBytes(1024L * 1024L);
FakeDatasetService fakeDatasetService = new FakeDatasetService();
fakeDatasetService.createDataset("non-executing-project", "schema_dataset", "", "", null);
fakeDatasetService.createTable(someTable);
List<TableRow> records =
Lists.newArrayList(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L));
fakeDatasetService.insertAll(someTable.getTableReference(), records, null);
FakeBigQueryServices fakeBqServices =
new FakeBigQueryServices()
.withJobService(new FakeJobService())
.withDatasetService(fakeDatasetService);
// test
BigQueryIO.TypedRead<TableRow> read =
BigQueryIO.readTableRowsWithSchema()
.from("non-executing-project:schema_dataset.schema_table")
.withTestServices(fakeBqServices)
.withoutValidation();
PCollection<TableRow> bqRows = p.apply(read);
Schema expectedSchema =
Schema.of(
Schema.Field.of("name", Schema.FieldType.STRING).withNullable(true),
Schema.Field.of("number", Schema.FieldType.INT64).withNullable(true));
assertEquals(expectedSchema, bqRows.getSchema());
PCollection<Row> output = bqRows.apply(Select.fieldNames("name", "number"));
PAssert.that(output)
.containsInAnyOrder(
ImmutableList.of(
Row.withSchema(expectedSchema).addValues("a", 1L).build(),
Row.withSchema(expectedSchema).addValues("b", 2L).build(),
Row.withSchema(expectedSchema).addValues("c", 3L).build()));
p.run();
}
@Test
public void testBuildSourceDisplayDataTable() {
String tableSpec = "project:dataset.tableid";
BigQueryIO.Read read =
BigQueryIO.read()
.from(tableSpec)
.withoutResultFlattening()
.usingStandardSql()
.withoutValidation();
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("table", tableSpec));
assertThat(displayData, hasDisplayItem("flattenResults", false));
assertThat(displayData, hasDisplayItem("useLegacySql", false));
assertThat(displayData, hasDisplayItem("validation", false));
}
@Test
public void testBuildSourceDisplayDataQuery() {
BigQueryIO.Read read =
BigQueryIO.read()
.fromQuery("myQuery")
.withoutResultFlattening()
.usingStandardSql()
.withoutValidation();
DisplayData displayData = DisplayData.from(read);
assertThat(displayData, hasDisplayItem("query", "myQuery"));
assertThat(displayData, hasDisplayItem("flattenResults", false));
assertThat(displayData, hasDisplayItem("useLegacySql", false));
assertThat(displayData, hasDisplayItem("validation", false));
}
@Test
public void testTableSourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigQueryIO.Read read =
BigQueryIO.read()
.from("project:dataset.tableId")
.withTestServices(
new FakeBigQueryServices()
.withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService()))
.withoutValidation();
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat(
"BigQueryIO.Read should include the table spec in its primitive display data",
displayData,
hasItem(hasDisplayItem("table")));
}
@Test
public void testQuerySourcePrimitiveDisplayData() throws IOException, InterruptedException {
DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
BigQueryIO.Read read =
BigQueryIO.read()
.fromQuery("foobar")
.withTestServices(
new FakeBigQueryServices()
.withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService()))
.withoutValidation();
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
assertThat(
"BigQueryIO.Read should include the query in its primitive display data",
displayData,
hasItem(hasDisplayItem("query")));
}
@Test
public void testBigQueryIOGetName() {
assertEquals("BigQueryIO.Read", BigQueryIO.read().from("somedataset.sometable").getName());
}
@Test
public void testBigQueryTableSourceInitSplit() throws Exception {
List<TableRow> expected =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
fakeDatasetService.createDataset("project", "data_set", "", "", null);
fakeDatasetService.createTable(
new Table()
.setTableReference(table)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER")))));
fakeDatasetService.insertAll(table, expected, null);
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setProject("project");
List<TableRow> read =
convertStringsToLong(
SourceTestUtils.readFromSplitsOfSource(bqSource, 0L /* ignored */, options));
assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
// Simulate a repeated call to split(), like a Dataflow worker will sometimes do.
sources = bqSource.split(200, options);
assertEquals(2, sources.size());
// A repeated call to split() should not have caused a duplicate extract job.
assertEquals(1, fakeJobService.getNumExtractJobCalls());
}
@Test
public void testEstimatedSizeWithoutStreamingBuffer() throws Exception {
List<TableRow> data =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
fakeDatasetService.createDataset("project", "data_set", "", "", null);
fakeDatasetService.createTable(
new Table()
.setTableReference(table)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER")))));
fakeDatasetService.insertAll(table, data, null);
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(108, bqSource.getEstimatedSizeBytes(options));
}
@Test
public void testEstimatedSizeWithStreamingBuffer() throws Exception {
List<TableRow> data =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
TableReference table = BigQueryHelpers.parseTableSpec("project:data_set.table_name");
fakeDatasetService.createDataset("project", "data_set", "", "", null);
fakeDatasetService.createTable(
new Table()
.setTableReference(table)
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))))
.setStreamingBuffer(new Streamingbuffer().setEstimatedBytes(BigInteger.valueOf(10))));
fakeDatasetService.insertAll(table, data, null);
String stepUuid = "testStepUuid";
BoundedSource<TableRow> bqSource =
BigQueryTableSourceDef.create(fakeBqServices, ValueProvider.StaticValueProvider.of(table))
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
PipelineOptions options = PipelineOptionsFactory.create();
assertEquals(118, bqSource.getEstimatedSizeBytes(options));
}
@Test
public void testBigQueryQuerySourceEstimatedSize() throws Exception {
String queryString = "fake query string";
PipelineOptions options = PipelineOptionsFactory.create();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setProject("project");
String stepUuid = "testStepUuid";
BigQuerySourceBase<TableRow> bqSource =
BigQueryQuerySourceDef.create(
fakeBqServices,
ValueProvider.StaticValueProvider.of(queryString),
true, /* flattenResults */
true, /* useLegacySql */
QueryPriority.BATCH,
null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
queryString,
new JobStatistics().setQuery(new JobStatistics2().setTotalBytesProcessed(100L)));
assertEquals(100, bqSource.getEstimatedSizeBytes(bqOptions));
}
@Test
public void testBigQueryQuerySourceInitSplit() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setProject("project");
TableReference sourceTableRef = BigQueryHelpers.parseTableSpec("project:dataset.table");
fakeDatasetService.createDataset(
sourceTableRef.getProjectId(),
sourceTableRef.getDatasetId(),
"asia-northeast1",
"Fake plastic tree^H^H^H^Htables",
null);
fakeDatasetService.createTable(
new Table().setTableReference(sourceTableRef).setLocation("asia-northeast1"));
Table queryResultTable =
new Table()
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
List<TableRow> expected =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
String encodedQuery = FakeBigQueryServices.encodeQueryResult(queryResultTable, expected);
String stepUuid = "testStepUuid";
TableReference tempTableReference =
createTempTableReference(
bqOptions.getProject(), createJobIdToken(options.getJobName(), stepUuid));
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
encodedQuery,
new JobStatistics()
.setQuery(
new JobStatistics2()
.setTotalBytesProcessed(100L)
.setReferencedTables(ImmutableList.of(sourceTableRef, tempTableReference))));
BoundedSource<TableRow> bqSource =
BigQueryQuerySourceDef.create(
fakeBqServices,
ValueProvider.StaticValueProvider.of(encodedQuery),
true /* flattenResults */,
true /* useLegacySql */,
QueryPriority.BATCH,
null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
List<TableRow> read =
convertStringsToLong(
SourceTestUtils.readFromSplitsOfSource(bqSource, 0L /* ignored */, options));
assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
}
/**
* This test simulates the scenario where the SQL text which is executed by the query job doesn't
* by itself refer to any tables (e.g. "SELECT 17 AS value"), and thus there are no referenced
* tables when the dry run of the query is performed.
*/
@Test
public void testBigQueryQuerySourceInitSplit_NoReferencedTables() throws Exception {
PipelineOptions options = PipelineOptionsFactory.create();
BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
bqOptions.setProject("project");
Table queryResultTable =
new Table()
.setSchema(
new TableSchema()
.setFields(
ImmutableList.of(
new TableFieldSchema().setName("name").setType("STRING"),
new TableFieldSchema().setName("number").setType("INTEGER"))));
List<TableRow> expected =
ImmutableList.of(
new TableRow().set("name", "a").set("number", 1L),
new TableRow().set("name", "b").set("number", 2L),
new TableRow().set("name", "c").set("number", 3L),
new TableRow().set("name", "d").set("number", 4L),
new TableRow().set("name", "e").set("number", 5L),
new TableRow().set("name", "f").set("number", 6L));
String encodedQuery = FakeBigQueryServices.encodeQueryResult(queryResultTable, expected);
String stepUuid = "testStepUuid";
fakeJobService.expectDryRunQuery(
bqOptions.getProject(),
encodedQuery,
new JobStatistics()
.setQuery(
new JobStatistics2()
.setTotalBytesProcessed(100L)
.setReferencedTables(ImmutableList.of())));
BoundedSource<TableRow> bqSource =
BigQueryQuerySourceDef.create(
fakeBqServices,
ValueProvider.StaticValueProvider.of(encodedQuery),
true /* flattenResults */,
true /* useLegacySql */,
QueryPriority.BATCH,
null,
null)
.toSource(stepUuid, TableRowJsonCoder.of(), BigQueryIO.TableRowParser.INSTANCE);
options.setTempLocation(testFolder.getRoot().getAbsolutePath());
List<TableRow> read =
convertStringsToLong(
SourceTestUtils.readFromSplitsOfSource(bqSource, 0L /* ignored */, options));
assertThat(read, containsInAnyOrder(Iterables.toArray(expected, TableRow.class)));
List<? extends BoundedSource<TableRow>> sources = bqSource.split(100, options);
assertEquals(2, sources.size());
}
@Test
public void testPassThroughThenCleanup() throws Exception {
PCollection<Integer> output =
p.apply(Create.of(1, 2, 3))
.apply(
new PassThroughThenCleanup<>(
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
// no-op
}
},
p.apply("Create1", Create.of("")).apply(View.asSingleton())));
PAssert.that(output).containsInAnyOrder(1, 2, 3);
p.run();
}
@Test
public void testPassThroughThenCleanupExecuted() throws Exception {
p.apply(Create.empty(VarIntCoder.of()))
.apply(
new PassThroughThenCleanup<>(
new PassThroughThenCleanup.CleanupOperation() {
@Override
void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
throw new RuntimeException("cleanup executed");
}
},
p.apply("Create1", Create.of("")).apply(View.asSingleton())));
thrown.expect(RuntimeException.class);
thrown.expectMessage("cleanup executed");
p.run();
}
@Test
public void testRuntimeOptionsNotCalledInApplyInputTable() {
BigQueryIO.Read read = BigQueryIO.read().from(p.newProvider("")).withoutValidation();
// Test that this doesn't throw.
DisplayData.from(read);
}
@Test
public void testRuntimeOptionsNotCalledInApplyInputQuery() {
BigQueryIO.Read read = BigQueryIO.read().fromQuery(p.newProvider("")).withoutValidation();
// Test that this doesn't throw.
DisplayData.from(read);
}
List<TableRow> convertStringsToLong(List<TableRow> toConvert) {
// The numbers come back as String after JSON serialization. Change them back to
// longs so that we can assert the output.
List<TableRow> converted = Lists.newArrayList();
for (TableRow entry : toConvert) {
TableRow convertedEntry = entry.clone();
convertedEntry.set("number", Long.parseLong((String) convertedEntry.get("number")));
converted.add(convertedEntry);
}
return converted;
}
@Test
public void testCoderInference() {
// Lambdas erase too much type information - use an anonymous class here.
SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>> parseFn =
new SerializableFunction<SchemaAndRecord, KV<ByteString, Mutation>>() {
@Override
public KV<ByteString, Mutation> apply(SchemaAndRecord input) {
return null;
}
};
assertEquals(
KvCoder.of(ByteStringCoder.of(), ProtoCoder.of(Mutation.class)),
BigQueryIO.read(parseFn).inferCoder(CoderRegistry.createDefault()));
}
}