[BEAM-10029] Spanner IO read and write performance tests (#13857)

diff --git a/.test-infra/jenkins/README.md b/.test-infra/jenkins/README.md
index 0df980c..e934d5c 100644
--- a/.test-infra/jenkins/README.md
+++ b/.test-infra/jenkins/README.md
@@ -113,6 +113,8 @@
 | beam_PerformanceTests_ParquetIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/) | `Run Java ParquetIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_ParquetIOIT_HDFS) |
 | beam_PerformanceTests_PubsubIOIT_Python_Streaming | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/) | `Run PubsubIO Performance Test Python` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_PubsubIOIT_Python_Streaming) |
 | beam_PerformanceTests_Spark | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark/) | `Run Spark Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_Spark) |
+| beam_PerformanceTests_SpannerIO_Read_2GB_Python | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch/) | `Run SpannerIO Read 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Read_2GB_Python_Batch) |
+| beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/) | `Run SpannerIO Write 2GB Performance Test Python Batch` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch) |
 | beam_PerformanceTests_TFRecordIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/) | `Run Java TFRecordIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TFRecordIOIT) |
 | beam_PerformanceTests_TextIOIT | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/), [hdfs_cron](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/) | `Run Java TextIO Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT) [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_TextIOIT_HDFS) |
 | beam_PerformanceTests_WordCountIT_Py37 | [cron](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/) | `Run Python37 WordCountIT Performance Test` | [![Build Status](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37/badge/icon)](https://ci-beam.apache.org/job/beam_PerformanceTests_WordCountIT_Py37) |
diff --git a/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
new file mode 100644
index 0000000..4b7b9c9
--- /dev/null
+++ b/.test-infra/jenkins/job_PerformanceTests_SpannerIO_Python.groovy
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import CommonJobProperties as commonJobProperties
+import LoadTestsBuilder as loadTestsBuilder
+import PhraseTriggeringPostCommitBuilder
+import InfluxDBCredentialsHelper
+
+def now = new Date().format("MMddHHmmss", TimeZone.getTimeZone('UTC'))
+
+def spannerio_read_test_2gb = [
+  title          : 'SpannerIO Read Performance Test Python 2 GB',
+  test           : 'apache_beam.io.gcp.experimental.spannerio_read_perf_test',
+  runner         : CommonTestProperties.Runner.DATAFLOW,
+  pipelineOptions: [
+    job_name             : 'performance-tests-spanner-read-python-2gb' + now,
+    project              : 'apache-beam-testing',
+    region               : 'us-central1',
+    temp_location        : 'gs://temp-storage-for-perf-tests/loadtests',
+    spanner_instance     : 'beam-test',
+    spanner_database     : 'pyspanner_read_2gb',
+    publish_to_big_query : true,
+    metrics_dataset      : 'beam_performance',
+    metrics_table        : 'pyspanner_read_2GB_results',
+    influx_measurement   : 'python_spannerio_read_2GB_results',
+    influx_db_name       : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influx_hostname      : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    input_options        : '\'{' +
+    '"num_records": 2097152,' +
+    '"key_size": 1,' +
+    '"value_size": 1024}\'',
+    num_workers          : 5,
+    autoscaling_algorithm: 'NONE',  // Disable autoscale the worker pool.
+  ]
+]
+
+def spannerio_write_test_2gb = [
+  title          : 'SpannerIO Write Performance Test Python Batch 2 GB',
+  test           : 'apache_beam.io.gcp.experimental.spannerio_write_perf_test',
+  runner         : CommonTestProperties.Runner.DATAFLOW,
+  pipelineOptions: [
+    job_name             : 'performance-tests-spannerio-write-python-batch-2gb' + now,
+    project              : 'apache-beam-testing',
+    region               : 'us-central1',
+    temp_location        : 'gs://temp-storage-for-perf-tests/loadtests',
+    spanner_instance     : 'beam-test',
+    spanner_database     : 'pyspanner_write_2gb',
+    publish_to_big_query : true,
+    metrics_dataset      : 'beam_performance',
+    metrics_table        : 'pyspanner_write_2GB_results',
+    influx_measurement   : 'python_spanner_write_2GB_results',
+    influx_db_name       : InfluxDBCredentialsHelper.InfluxDBDatabaseName,
+    influx_hostname      : InfluxDBCredentialsHelper.InfluxDBHostUrl,
+    input_options        : '\'{' +
+    '"num_records": 2097152,' +
+    '"key_size": 1,' +
+    '"value_size": 1024}\'',
+    num_workers          : 5,
+    autoscaling_algorithm: 'NONE',  // Disable autoscale the worker pool.
+  ]
+]
+
+def executeJob = { scope, testConfig ->
+  commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 480)
+
+  loadTestsBuilder.loadTest(scope, testConfig.title, testConfig.runner, CommonTestProperties.SDK.PYTHON, testConfig.pipelineOptions, testConfig.test)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+    'beam_PerformanceTests_SpannerIO_Read_2GB_Python',
+    'Run SpannerIO Read 2GB Performance Test Python',
+    'SpannerIO Read 2GB Performance Test Python',
+    this
+    ) {
+      executeJob(delegate, spannerio_read_test_2gb)
+    }
+
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Read_2GB_Python', 'H 15 * * *', this) {
+  executeJob(delegate, spannerio_read_test_2gb)
+}
+
+PhraseTriggeringPostCommitBuilder.postCommitJob(
+    'beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch',
+    'Run SpannerIO Write 2GB Performance Test Python Batch',
+    'SpannerIO Write 2GB Performance Test Python Batch',
+    this
+    ) {
+      executeJob(delegate, spannerio_write_test_2gb)
+    }
+
+CronJobBuilder.cronJob('beam_PerformanceTests_SpannerIO_Write_2GB_Python_Batch', 'H 15 * * *', this) {
+  executeJob(delegate, spannerio_write_test_2gb)
+}
diff --git a/CHANGES.md b/CHANGES.md
index 489a9c6..9b57772 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -64,6 +64,7 @@
 ## New Features / Improvements
 
 * X feature added (Java/Python) ([BEAM-X](https://issues.apache.org/jira/browse/BEAM-X)).
+* Added Spanner IO Performance tests for read and write. (Python) ([BEAM-10029](https://issues.apache.org/jira/browse/BEAM-10029))
 
 ## Breaking Changes
 
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py
new file mode 100644
index 0000000..ae7c1ea
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_read_perf_test.py
@@ -0,0 +1,158 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A performance test for reading data from a Spanner database table.
+Besides of the standard options, there are options with special meaning:
+* spanner_instance - Spanner Instance ID.
+* spanner_database - Spanner Database ID.
+The table will be created and populated with data from Synthetic Source if it
+does not exist.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning.
+
+Example test run on DataflowRunner:
+python -m apache_beam.io.gcp.experimental.spannerio_read_perf_test \
+  --test-pipeline-options="
+  --runner=TestDataflowRunner
+  --project='...'
+  --region='...'
+  --temp_location='gs://...'
+  --sdk_location=build/apache-beam.tar.gz
+  --publish_to_big_query=true
+  --metrics_dataset='...'
+  --metrics_table='...'
+  --spanner_instance='...'
+  --spanner_database='...'
+  --input_options='{
+    \"num_records\": 10,
+    \"key_size\": 1,
+    \"value_size\": 1024
+    }'"
+
+This setup will result in a table of 1MB size.
+"""
+
+from __future__ import absolute_import
+
+import logging
+
+from apache_beam import FlatMap
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import Read
+from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner
+from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+from apache_beam.transforms.combiners import Count
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.api_core.exceptions import AlreadyExists
+  from google.cloud import spanner
+except ImportError:
+  spanner = None
+  AlreadyExists = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class SpannerReadPerfTest(LoadTest):
+  def __init__(self):
+    super(SpannerReadPerfTest, self).__init__()
+    self.project = self.pipeline.get_option('project')
+    self.spanner_instance = self.pipeline.get_option('spanner_instance')
+    self.spanner_database = self.pipeline.get_option('spanner_database')
+    self._init_setup()
+
+  def _create_database(self):
+    spanner_client = spanner.Client()
+    instance = spanner_client.instance(self.spanner_instance)
+    database = instance.database(
+        self.spanner_database,
+        ddl_statements=[
+            """CREATE TABLE test_data (
+            id      STRING(99) NOT NULL,
+            data    BYTES(MAX) NOT NULL
+         ) PRIMARY KEY (id)""",
+        ])
+    database.create()
+
+  def _init_setup(self):
+    """Checks if a spanner database exists and creates it if not."""
+    try:
+      self._create_database()
+      self._create_input_data()
+    except AlreadyExists:
+      # pass if the database already exists
+      pass
+
+  def _create_input_data(self):
+    """
+    Runs an additional pipeline which creates test data and waits for its
+    completion.
+    """
+    def format_record(record):
+      import base64
+      return base64.b64encode(record[1])
+
+    def make_insert_mutations(element):
+      import uuid
+      from apache_beam.io.gcp.experimental.spannerio import WriteMutation
+      ins_mutation = WriteMutation.insert(
+          table='test_data',
+          columns=('id', 'data'),
+          values=[(str(uuid.uuid1()), element)])
+      return [ins_mutation]
+
+    with TestPipeline() as p:
+      (  # pylint: disable=expression-not-assigned
+          p
+          | 'Produce rows' >> Read(
+              SyntheticSource(self.parse_synthetic_source_options()))
+          | 'Format' >> Map(format_record)
+          | 'Make mutations' >> FlatMap(make_insert_mutations)
+          | 'Write to Spanner' >> WriteToSpanner(
+            project_id=self.project,
+            instance_id=self.spanner_instance,
+            database_id=self.spanner_database,
+            max_batch_size_bytes=5120))
+
+  def test(self):
+    output = (
+        self.pipeline
+        | 'Read from Spanner' >> ReadFromSpanner(
+            self.project,
+            self.spanner_instance,
+            self.spanner_database,
+            sql="select data from test_data")
+        | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
+        | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
+        | 'Count' >> Count.Globally())
+    assert_that(output, equal_to([self.input_options['num_records']]))
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  SpannerReadPerfTest().run()
diff --git a/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py
new file mode 100644
index 0000000..707db81
--- /dev/null
+++ b/sdks/python/apache_beam/io/gcp/experimental/spannerio_write_perf_test.py
@@ -0,0 +1,146 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A pipeline that writes data from Synthetic Source to a Spanner.
+Besides of the standard options, there are options with special meaning:
+* spanner_instance - Spanner Instance ID.
+* spanner_database - Spanner Database ID.
+* input_options - options for Synthetic Source:
+num_records - number of rows to be inserted,
+value_size - the length of a single row,
+key_size - required option, but its value has no meaning.
+
+Example test run on DataflowRunner:
+
+python -m apache_beam.io.gcp.experimental.spannerio_write_perf_test \
+  --test-pipeline-options="
+  --runner=TestDataflowRunner
+  --project='...'
+  --region='...'
+  --temp_location='gs://...'
+  --sdk_location=build/apache-beam.tar.gz
+  --publish_to_big_query=true
+  --metrics_dataset='...'
+  --metrics_table='...'
+  --spanner_instance='...'
+  --spanner_database='...'
+  --input_options='{
+    \"num_records\": 10,
+    \"key_size\": 1,
+    \"value_size\": 1024
+    }'"
+
+This setup will result in a table of 1MB size.
+"""
+
+from __future__ import absolute_import
+
+import logging
+import random
+import uuid
+
+from apache_beam import FlatMap
+from apache_beam import Map
+from apache_beam import ParDo
+from apache_beam.io import Read
+from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner
+from apache_beam.testing.load_tests.load_test import LoadTest
+from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages
+from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
+from apache_beam.testing.synthetic_pipeline import SyntheticSource
+
+# pylint: disable=wrong-import-order, wrong-import-position
+try:
+  from google.cloud import spanner
+except ImportError:
+  spanner = None
+# pylint: enable=wrong-import-order, wrong-import-position
+
+
+class SpannerWritePerfTest(LoadTest):
+  TEST_DATABASE = None
+
+  def __init__(self):
+    super(SpannerWritePerfTest, self).__init__()
+    self.project = self.pipeline.get_option('project')
+    self.spanner_instance = self.pipeline.get_option('spanner_instance')
+    self.spanner_database = self.pipeline.get_option('spanner_database')
+    self._init_setup()
+
+  def _generate_table_name(self):
+    self.TEST_DATABASE = "{}_{}".format(
+        self.spanner_database, ''.join(random.sample(uuid.uuid4().hex, 4)))
+    return self.TEST_DATABASE
+
+  def _create_database(self):
+    spanner_client = spanner.Client()
+    instance = self._SPANNER_INSTANCE = spanner_client.instance(
+        self.spanner_instance)
+    database = instance.database(
+        self.TEST_DATABASE,
+        ddl_statements=[
+            """CREATE TABLE test (
+            id      STRING(99) NOT NULL,
+            data    BYTES(MAX) NOT NULL
+         ) PRIMARY KEY (id)""",
+        ])
+    database.create()
+
+  def _init_setup(self):
+    """Create database."""
+    self._generate_table_name()
+    self._create_database()
+
+  def test(self):
+    def format_record(record):
+      import base64
+      return base64.b64encode(record[1])
+
+    def make_insert_mutations(element):
+      import uuid  # pylint: disable=reimported
+      from apache_beam.io.gcp.experimental.spannerio import WriteMutation
+      ins_mutation = WriteMutation.insert(
+          table='test',
+          columns=('id', 'data'),
+          values=[(str(uuid.uuid1()), element)])
+      return [ins_mutation]
+
+    (  # pylint: disable=expression-not-assigned
+        self.pipeline
+        | 'Produce rows' >> Read(
+            SyntheticSource(self.parse_synthetic_source_options()))
+        | 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
+        | 'Format' >> Map(format_record)
+        | 'Make mutations' >> FlatMap(make_insert_mutations)
+        | 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
+        | 'Write to Spanner' >> WriteToSpanner(
+          project_id=self.project,
+          instance_id=self.spanner_instance,
+          database_id=self.TEST_DATABASE,
+          max_batch_size_bytes=5120)
+    )
+
+  def cleanup(self):
+    """Removes test database."""
+    database = self._SPANNER_INSTANCE.database(self.TEST_DATABASE)
+    database.drop()
+
+
+if __name__ == '__main__':
+  logging.basicConfig(level=logging.INFO)
+  SpannerWritePerfTest().run()