[BEAM-10029] Spanner IO read and write performance tests (#13857)
diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 0df980c..e934d5c 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -113,6 +113,8 @@
| beam_PerformanceTests_ParquetIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/) | `Run Java ParquetIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS) |
| beam_PerformanceTests_PubsubIOIT_Python_Streaming | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/) | `Run PubsubIO Performance Test Python` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming) |
| beam_PerformanceTests_Spark | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark/) | `Run Spark Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark) |
+| beam_PerformanceTests_SpannerIO_Read_2GB_Python | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch/) | `Run SpannerIO Read 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch) |
+| beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/) | `Run SpannerIO Write 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch) |
| beam_PerformanceTests_TFRecordIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java TFRecordIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
| beam_PerformanceTests_TextIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS) |
| beam_PerformanceTests_WordCountIT_Py37 | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/) | `Run Python37 WordCountIT Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37) |
diff --git a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
new file mode 100644
index 0000000..4b7b9c9
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def spannerio_read_test_2gb = [
+ title : 'SpannerIO Read Performance Test Python 2 GB',
+ test : 'apache_beam.io.gcp.experimental.spannerio_read_perf_test',
+ runner : CommonTestProperties.Runner.DATAFLOW,
+ pipelineOptions: [
+ job_name : 'performance-tests-spanner-read-python-2gb' + now,
+ project : 'apache-beam-testing',
+ region : 'us-central1',
+ temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
+ spanner_instance : 'beam-test',
+ spanner_database : 'pyspanner_read_2gb',
+ publish_to_big_query : true,
+ metrics_dataset : 'beam_performance',
+ metrics_table : 'pyspanner_read_2GB_results',
+ influx_measurement : 'python_spannerio_read_2GB_results',
+ influx_db_name : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influx_hostname : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ input_options : '\'{' +
+ '"num_records": 2097152,' +
+ '"key_size": 1,' +
+ '"value_size": 1024}\'',
+ num_workers : 5,
+ autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
+ ]
+]
+
+def spannerio_write_test_2gb = [
+ title : 'SpannerIO Write Performance Test Python Batch 2 GB',
+ test : 'apache_beam.io.gcp.experimental.spannerio_write_perf_test',
+ runner : CommonTestProperties.Runner.DATAFLOW,
+ pipelineOptions: [
+ job_name : 'performance-tests-spannerio-write-python-batch-2gb' + now,
+ project : 'apache-beam-testing',
+ region : 'us-central1',
+ temp_location : 'gs://temp-storage-for-perf-tests/loadtests',
+ spanner_instance : 'beam-test',
+ spanner_database : 'pyspanner_write_2gb',
+ publish_to_big_query : true,
+ metrics_dataset : 'beam_performance',
+ metrics_table : 'pyspanner_write_2GB_results',
+ influx_measurement : 'python_spanner_write_2GB_results',
+ influx_db_name : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+ influx_hostname : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+ input_options : '\'{' +
+ '"num_records": 2097152,' +
+ '"key_size": 1,' +
+ '"value_size": 1024}\'',
+ num_workers : 5,
+ autoscaling_algorithm: 'NONE', // Disable autoscale the worker pool.
+ ]
+]
+
+def executeJob = { scope, testConfig ->
+ commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 480)
+
+ loadTestsBuilder.loadTest(scope, testConfig.title, testConfig.runner, CommonTestProperties.SDK.PYTHON, testConfig.pipelineOptions, testConfig.test)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+ 'beam_PerformanceTests_SpannerIO_Read_2GB_Python',
+ 'Run SpannerIO Read 2GB Performance Test Python',
+ 'SpannerIO Read 2GB Performance Test Python',
+ this
+ ) {
+ executeJob(delegate, spannerio_read_test_2gb)
+ }
+
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H 15 * * *', this) {
+ executeJob(delegate, spannerio_read_test_2gb)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+ 'beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
+ 'Run SpannerIO Write 2GB Performance Test Python Batch',
+ 'SpannerIO Write 2GB Performance Test Python Batch',
+ this
+ ) {
+ executeJob(delegate, spannerio_write_test_2gb)
+ }
+
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H 15 * * *', this) {
+ executeJob(delegate, spannerio_write_test_2gb)
+}
diff --git a/CHANGES.md b/CHANGES.md
index 489a9c6..9b57772 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -64,6 +64,7 @@
## New Features / Improvements
* X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Added Spanner IO Performance tests for read and write. (Python) ([BEAM-10029](https://issues.apache.org/jira/browse/BEAM-10029))
## Breaking Changes
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py
new file mode 100644
index 0000000..ae7c1ea
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A performance test for reading data from a Spanner database table.
+Besides of the standard options, there are options with special meaning:
+* spanner_instance - Spanner Instance ID.
+* spanner_database - Spanner Database ID.
+The table will be created and populated with data from Synthetic Source if it
+does not exist.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning.
+
+Example test run on DataflowRunner:
+python -m apache_beam.io.gcp.experimental.spannerio_read_perf_test \
+ --test-pipeline-options="
+ --runner=TestDataflowRunner
+ --project='...'
+ --region='...'
+ --temp_location='gs://...'
+ --sdk_location=build/apache-beam.tar.gz
+ --publish_to_big_query=true
+ --metrics_dataset='...'
+ --metrics_table='...'
+ --spanner_instance='...'
+ --spanner_database='...'
+ --input_options='{
+ \"num_records\": 10,
+ \"key_size\": 1,
+ \"value_size\": 1024
+ }'"
+
+This setup will result in a table of 1MB size.
+"""
+
+from __future__ import absolute_import
+
+import logging
+
+from apache_beam import FlatMap
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import Read
+from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner
+from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.combiners import Count
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.api_core.exceptions import AlreadyExists
+ from google.cloud import spanner
+except ImportError:
+ spanner = None
+ AlreadyExists = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class SpannerReadPerfTest(LoadTest):
+ def __init__(self):
+ super(SpannerReadPerfTest, self).__init__()
+ self.project = self.pipeline.get_option('project')
+ self.spanner_instance = self.pipeline.get_option('spanner_instance')
+ self.spanner_database = self.pipeline.get_option('spanner_database')
+ self._init_setup()
+
+ def _create_database(self):
+ spanner_client = spanner.Client()
+ instance = spanner_client.instance(self.spanner_instance)
+ database = instance.database(
+ self.spanner_database,
+ ddl_statements=[
+ """CREATE TABLE test_data (
+ id STRING(99) NOT NULL,
+ data BYTES(MAX) NOT NULL
+ ) PRIMARY KEY (id)""",
+ ])
+ database.create()
+
+ def _init_setup(self):
+ """Checks if a spanner database exists and creates it if not."""
+ try:
+ self._create_database()
+ self._create_input_data()
+ except AlreadyExists:
+ # pass if the database already exists
+ pass
+
+ def _create_input_data(self):
+ """
+ Runs an additional pipeline which creates test data and waits for its
+ completion.
+ """
+ def format_record(record):
+ import base64
+ return base64.b64encode(record[1])
+
+ def make_insert_mutations(element):
+ import uuid
+ from apache_beam.io.gcp.experimental.spannerio import WriteMutation
+ ins_mutation = WriteMutation.insert(
+ table='test_data',
+ columns=('id', 'data'),
+ values=[(str(uuid.uuid1()), element)])
+ return [ins_mutation]
+
+ with TestPipeline() as p:
+ ( # pylint: disable=expression-not-assigned
+ p
+ | 'Produce rows' >> Read(
+ SyntheticSource(self.parse_synthetic_source_options()))
+ | 'Format' >> Map(format_record)
+ | 'Make mutations' >> FlatMap(make_insert_mutations)
+ | 'Write to Spanner' >> WriteToSpanner(
+ project_id=self.project,
+ instance_id=self.spanner_instance,
+ database_id=self.spanner_database,
+ max_batch_size_bytes=5120))
+
+ def test(self):
+ output = (
+ self.pipeline
+ | 'Read from Spanner' >> ReadFromSpanner(
+ self.project,
+ self.spanner_instance,
+ self.spanner_database,
+ sql="select data from test_data")
+ | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
+ | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
+ | 'Count' >> Count.Globally())
+ assert_that(output, equal_to([self.input_options['num_records']]))
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ SpannerReadPerfTest().run()
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py
new file mode 100644
index 0000000..707db81
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline that writes data from Synthetic Source to a Spanner.
+Besides of the standard options, there are options with special meaning:
+* spanner_instance - Spanner Instance ID.
+* spanner_database - Spanner Database ID.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning.
+
+Example test run on DataflowRunner:
+
+python -m apache_beam.io.gcp.experimental.spannerio_write_perf_test \
+ --test-pipeline-options="
+ --runner=TestDataflowRunner
+ --project='...'
+ --region='...'
+ --temp_location='gs://...'
+ --sdk_location=build/apache-beam.tar.gz
+ --publish_to_big_query=true
+ --metrics_dataset='...'
+ --metrics_table='...'
+ --spanner_instance='...'
+ --spanner_database='...'
+ --input_options='{
+ \"num_records\": 10,
+ \"key_size\": 1,
+ \"value_size\": 1024
+ }'"
+
+This setup will result in a table of 1MB size.
+"""
+
+from __future__ import absolute_import
+
+import logging
+import random
+import uuid
+
+from apache_beam import FlatMap
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import Read
+from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+ from google.cloud import spanner
+except ImportError:
+ spanner = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class SpannerWritePerfTest(LoadTest):
+ TEST_DATABASE = None
+
+ def __init__(self):
+ super(SpannerWritePerfTest, self).__init__()
+ self.project = self.pipeline.get_option('project')
+ self.spanner_instance = self.pipeline.get_option('spanner_instance')
+ self.spanner_database = self.pipeline.get_option('spanner_database')
+ self._init_setup()
+
+ def _generate_table_name(self):
+ self.TEST_DATABASE = "{}_{}".format(
+ self.spanner_database, ''.join(random.sample(uuid.uuid4().hex, 4)))
+ return self.TEST_DATABASE
+
+ def _create_database(self):
+ spanner_client = spanner.Client()
+ instance = self._SPANNER_INSTANCE = spanner_client.instance(
+ self.spanner_instance)
+ database = instance.database(
+ self.TEST_DATABASE,
+ ddl_statements=[
+ """CREATE TABLE test (
+ id STRING(99) NOT NULL,
+ data BYTES(MAX) NOT NULL
+ ) PRIMARY KEY (id)""",
+ ])
+ database.create()
+
+ def _init_setup(self):
+ """Create database."""
+ self._generate_table_name()
+ self._create_database()
+
+ def test(self):
+ def format_record(record):
+ import base64
+ return base64.b64encode(record[1])
+
+ def make_insert_mutations(element):
+ import uuid # pylint: disable=reimported
+ from apache_beam.io.gcp.experimental.spannerio import WriteMutation
+ ins_mutation = WriteMutation.insert(
+ table='test',
+ columns=('id', 'data'),
+ values=[(str(uuid.uuid1()), element)])
+ return [ins_mutation]
+
+ ( # pylint: disable=expression-not-assigned
+ self.pipeline
+ | 'Produce rows' >> Read(
+ SyntheticSource(self.parse_synthetic_source_options()))
+ | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
+ | 'Format' >> Map(format_record)
+ | 'Make mutations' >> FlatMap(make_insert_mutations)
+ | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
+ | 'Write to Spanner' >> WriteToSpanner(
+ project_id=self.project,
+ instance_id=self.spanner_instance,
+ database_id=self.TEST_DATABASE,
+ max_batch_size_bytes=5120)
+ )
+
+ def cleanup(self):
+ """Removes test database."""
+ database = self._SPANNER_INSTANCE.database(self.TEST_DATABASE)
+ database.drop()
+
+
+if __name__ == '__main__':
+ logging.basicConfig(level=logging.INFO)
+ SpannerWritePerfTest().run()