Merge pull request #10294: [BEAM-8895] Add BigQuery table name sanitization to BigQueryIOIT
diff --git a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
index c9bdc5c..6f5abf3 100644
--- a/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
+++ b/.test-infra/jenkins/job_PerformanceTests_BigQueryIO_Java.groovy
@@ -33,8 +33,9 @@
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
tempRoot : 'gs://temp-storage-for-perf-tests/loadtests',
writeMethod : 'STREAMING_INSERTS',
+ writeFormat : 'JSON',
testBigQueryDataset : 'beam_performance',
- testBigQueryTable : 'bqio_write_10GB_java',
+ testBigQueryTable : 'bqio_write_10GB_java_stream_' + now,
metricsBigQueryDataset: 'beam_performance',
metricsBigQueryTable : 'bqio_10GB_results_java_stream',
sourceOptions : """
@@ -51,19 +52,48 @@
]
],
[
- title : 'BigQueryIO Batch Performance Test Java 10 GB',
- triggerPhrase: 'Run BigQueryIO Batch Performance Test Java',
- name : 'beam_BiqQueryIO_Batch_Performance_Test_Java',
+ title : 'BigQueryIO Batch Performance Test Java 10 GB JSON',
+ triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Json',
+ name : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Json',
itClass : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
properties: [
project : 'apache-beam-testing',
tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
tempRoot : 'gs://temp-storage-for-perf-tests/loadtests',
writeMethod : 'FILE_LOADS',
+ writeFormat : 'JSON',
testBigQueryDataset : 'beam_performance',
- testBigQueryTable : 'bqio_write_10GB_java',
+ testBigQueryTable : 'bqio_write_10GB_java_json_' + now,
metricsBigQueryDataset: 'beam_performance',
- metricsBigQueryTable : 'bqio_10GB_results_java_batch',
+ metricsBigQueryTable : 'bqio_10GB_results_java_batch_json',
+ sourceOptions : """
+ {
+ "numRecords": "10485760",
+ "keySizeBytes": "1",
+ "valueSizeBytes": "1024"
+ }
+ """.trim().replaceAll("\\s", ""),
+ runner : "DataflowRunner",
+ maxNumWorkers : '5',
+ numWorkers : '5',
+ autoscalingAlgorithm : 'NONE',
+ ]
+ ],
+ [
+ title : 'BigQueryIO Batch Performance Test Java 10 GB AVRO',
+ triggerPhrase: 'Run BigQueryIO Batch Performance Test Java Avro',
+ name : 'beam_BiqQueryIO_Batch_Performance_Test_Java_Avro',
+ itClass : 'org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT',
+ properties: [
+ project : 'apache-beam-testing',
+ tempLocation : 'gs://temp-storage-for-perf-tests/loadtests',
+ tempRoot : 'gs://temp-storage-for-perf-tests/loadtests',
+ writeMethod : 'FILE_LOADS',
+ writeFormat : 'AVRO',
+ testBigQueryDataset : 'beam_performance',
+ testBigQueryTable : 'bqio_write_10GB_java_avro_' + now,
+ metricsBigQueryDataset: 'beam_performance',
+ metricsBigQueryTable : 'bqio_10GB_results_java_batch_avro',
sourceOptions : """
{
"numRecords": "10485760",
diff --git a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
index fff2eac..843ba78 100644
--- a/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
+++ b/sdks/java/io/bigquery-io-perf-tests/src/test/java/org/apache/beam/sdk/bigqueryioperftests/BigQueryIOIT.java
@@ -68,6 +68,7 @@
* "--metricsBigQueryDataset=metrics_dataset", \
* "--metricsBigQueryTable=metrics_table", \
* "--writeMethod=FILE_LOADS", \
+ * "--writeFormat=AVRO", \
* "--sourceOptions={\"numRecords\":\"1000\", \"keySizeBytes\":\"1\", \"valueSizeBytes\":\"1024\"}" \
* ]' \
* --tests org.apache.beam.sdk.bigqueryioperftests.BigQueryIOIT \
@@ -90,6 +91,7 @@
private static String tableQualifier;
private static String tempRoot;
private static BigQueryPerfTestOptions options;
+ private static WriteFormat writeFormat;
@BeforeClass
public static void setup() throws IOException {
@@ -100,7 +102,8 @@
metricsBigQueryDataset = options.getMetricsBigQueryDataset();
metricsBigQueryTable = options.getMetricsBigQueryTable();
testBigQueryDataset = options.getTestBigQueryDataset();
- testBigQueryTable = String.format("%s_%s", options.getTestBigQueryTable(), TEST_ID);
+ testBigQueryTable = options.getTestBigQueryTable();
+ writeFormat = WriteFormat.valueOf(options.getWriteFormat());
BigQueryOptions bigQueryOptions = BigQueryOptions.newBuilder().build();
tableQualifier =
String.format(
@@ -117,8 +120,14 @@
@Test
public void testWriteThenRead() {
- testJsonWrite();
- testAvroWrite();
+ switch (writeFormat) {
+ case AVRO:
+ testAvroWrite();
+ break;
+ case JSON:
+ testJsonWrite();
+ break;
+ }
testRead();
}
@@ -235,6 +244,11 @@
String getWriteMethod();
void setWriteMethod(String value);
+
+ String getWriteFormat();
+
+ @Description("Write Avro or JSON to BQ")
+ void setWriteFormat(String value);
}
private static class MapKVToV extends DoFn<KV<byte[], byte[]>, byte[]> {
@@ -243,4 +257,9 @@
context.output(context.element().getValue());
}
}
+
+ private enum WriteFormat {
+ AVRO,
+ JSON
+ }
}