blob: 843ba781d264a97ef14c059f2490c2df6648bc42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.bigqueryioperftests;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.Timestamp;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.TableId;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.UUID;
import java.util.function.Function;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/**
* A performance test of {@link org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO}.
*
* <p>Usage:
*
* <pre>
* ./gradlew integrationTest -p sdks/java/io/bigquery-io-perf-tests -DintegrationTestPipelineOptions='[ \
* "--testBigQueryDataset=test_dataset", \
* "--testBigQueryTable=test_table", \
* "--metricsBigQueryDataset=metrics_dataset", \
* "--metricsBigQueryTable=metrics_table", \
* "--writeMethod=FILE_LOADS", \
* "--writeFormat=AVRO", \
* "--sourceOptions={\"numRecords\":\"1000\", \"keySizeBytes\":\"1\", \"valueSizeBytes\":\"1024\"}" \
* ]' \
* --tests org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT \
* -DintegrationTestRunner=direct
* </pre>
*/
@RunWith(JUnit4.class)
public class BigQueryIOIT {
private static final String NAMESPACE = BigQueryIOIT.class.getName();
private static final String TEST_ID = UUID.randomUUID().toString();
private static final String TEST_TIMESTAMP = Timestamp.now().toString();
private static final String READ_TIME_METRIC_NAME = "read_time";
private static final String WRITE_TIME_METRIC_NAME = "write_time";
private static final String AVRO_WRITE_TIME_METRIC_NAME = "avro_write_time";
private static String metricsBigQueryTable;
private static String metricsBigQueryDataset;
private static String testBigQueryDataset;
private static String testBigQueryTable;
private static SyntheticSourceOptions sourceOptions;
private static String tableQualifier;
private static String tempRoot;
private static BigQueryPerfTestOptions options;
private static WriteFormat writeFormat;
@BeforeClass
public static void setup() throws IOException {
options = IOITHelper.readIOTestPipelineOptions(BigQueryPerfTestOptions.class);
tempRoot = options.getTempRoot();
sourceOptions =
SyntheticOptions.fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);
metricsBigQueryDataset = options.getMetricsBigQueryDataset();
metricsBigQueryTable = options.getMetricsBigQueryTable();
testBigQueryDataset = options.getTestBigQueryDataset();
testBigQueryTable = options.getTestBigQueryTable();
writeFormat = WriteFormat.valueOf(options.getWriteFormat());
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder().build();
tableQualifier =
String.format(
"%s:%s.%s", bigQueryOptions.getProjectId(), testBigQueryDataset, testBigQueryTable);
}
@AfterClass
public static void tearDown() {
BigQueryOptions options = BigQueryOptions.newBuilder().build();
BigQuery client = options.getService();
TableId tableId = TableId.of(options.getProjectId(), testBigQueryDataset, testBigQueryTable);
client.delete(tableId);
}
@Test
public void testWriteThenRead() {
switch (writeFormat) {
case AVRO:
testAvroWrite();
break;
case JSON:
testJsonWrite();
break;
}
testRead();
}
private void testJsonWrite() {
BigQueryIO.Write<byte[]> writeIO =
BigQueryIO.<byte[]>write()
.withFormatFunction(
input -> {
TableRow tableRow = new TableRow();
tableRow.set("data", input);
return tableRow;
});
testWrite(writeIO, WRITE_TIME_METRIC_NAME);
}
private void testAvroWrite() {
BigQueryIO.Write<byte[]> writeIO =
BigQueryIO.<byte[]>write()
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withAvroFormatFunction(
writeRequest -> {
byte[] data = writeRequest.getElement();
GenericRecord record = new GenericData.Record(writeRequest.getSchema());
record.put("data", ByteBuffer.wrap(data));
return record;
});
testWrite(writeIO, AVRO_WRITE_TIME_METRIC_NAME);
}
private void testWrite(BigQueryIO.Write<byte[]> writeIO, String metricName) {
Pipeline pipeline = Pipeline.create(options);
BigQueryIO.Write.Method method = BigQueryIO.Write.Method.valueOf(options.getWriteMethod());
pipeline
.apply("Read from source", Read.from(new SyntheticBoundedSource(sourceOptions)))
.apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, WRITE_TIME_METRIC_NAME)))
.apply("Map records", ParDo.of(new MapKVToV()))
.apply(
"Write to BQ",
writeIO
.to(tableQualifier)
.withCustomGcsTempLocation(ValueProvider.StaticValueProvider.of(tempRoot))
.withMethod(method)
.withSchema(
new TableSchema()
.setFields(
Collections.singletonList(
new TableFieldSchema().setName("data").setType("BYTES")))));
PipelineResult pipelineResult = pipeline.run();
pipelineResult.waitUntilFinish();
extractAndPublishTime(pipelineResult, metricName);
}
private void testRead() {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read from BQ", BigQueryIO.readTableRows().from(tableQualifier))
.apply("Gather time", ParDo.of(new TimeMonitor<>(NAMESPACE, READ_TIME_METRIC_NAME)));
PipelineResult result = pipeline.run();
result.waitUntilFinish();
extractAndPublishTime(result, READ_TIME_METRIC_NAME);
}
private void extractAndPublishTime(PipelineResult pipelineResult, String writeTimeMetricName) {
NamedTestResult metricResult =
getMetricSupplier(writeTimeMetricName).apply(new MetricsReader(pipelineResult, NAMESPACE));
IOITMetrics.publish(
TEST_ID,
TEST_TIMESTAMP,
metricsBigQueryDataset,
metricsBigQueryTable,
Collections.singletonList(metricResult));
}
private static Function<MetricsReader, NamedTestResult> getMetricSupplier(String metricName) {
return reader -> {
long startTime = reader.getStartTimeMetric(metricName);
long endTime = reader.getEndTimeMetric(metricName);
return NamedTestResult.create(
TEST_ID, TEST_TIMESTAMP, metricName, (endTime - startTime) / 1e3);
};
}
/** Options for this io performance test. */
public interface BigQueryPerfTestOptions extends IOTestPipelineOptions {
@Description("Synthetic source options")
@Validation.Required
String getSourceOptions();
void setSourceOptions(String value);
@Description("BQ dataset for the test data")
String getTestBigQueryDataset();
void setTestBigQueryDataset(String dataset);
@Description("BQ table for test data")
String getTestBigQueryTable();
void setTestBigQueryTable(String table);
@Description("BQ dataset for the metrics data")
String getMetricsBigQueryDataset();
void setMetricsBigQueryDataset(String dataset);
@Description("BQ table for metrics data")
String getMetricsBigQueryTable();
void setMetricsBigQueryTable(String table);
@Description("Should test use streaming writes or batch loads to BQ")
String getWriteMethod();
void setWriteMethod(String value);
String getWriteFormat();
@Description("Write Avro or JSON to BQ")
void setWriteFormat(String value);
}
private static class MapKVToV extends DoFn<KV<byte[], byte[]>, byte[]> {
@ProcessElement
public void process(ProcessContext context) {
context.output(context.element().getValue());
}
}
private enum WriteFormat {
AVRO,
JSON
}
}