blob: ae7c1ea8d765a7a0620f9bbab21d2d3dbfd80e50 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A performance test for reading data from a Spanner database table.
Besides of the standard options, there are options with special meaning:
* spanner_instance - Spanner Instance ID.
* spanner_database - Spanner Database ID.
The table will be created and populated with data from Synthetic Source if it
does not exist.
* input_options - options for Synthetic Source:
num_records - number of rows to be inserted,
value_size - the length of a single row,
key_size - required option, but its value has no meaning.
Example test run on DataflowRunner:
python -m apache_beam.io.gcp.experimental.spannerio_read_perf_test \
--test-pipeline-options="
--runner=TestDataflowRunner
--project='...'
--region='...'
--temp_location='gs://...'
--sdk_location=build/apache-beam.tar.gz
--publish_to_big_query=true
--metrics_dataset='...'
--metrics_table='...'
--spanner_instance='...'
--spanner_database='...'
--input_options='{
\"num_records\": 10,
\"key_size\": 1,
\"value_size\": 1024
}'"
This setup will result in a table of 1MB size.
"""
from __future__ import absolute_import
import logging
from apache_beam import FlatMap
from apache_beam import Map
from apache_beam import ParDo
from apache_beam.io import Read
from apache_beam.io.gcp.experimental.spannerio import ReadFromSpanner
from apache_beam.io.gcp.experimental.spannerio import WriteToSpanner
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.testing.load_tests.load_test_metrics_utils import CountMessages
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.synthetic_pipeline import SyntheticSource
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.combiners import Count
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.api_core.exceptions import AlreadyExists
from google.cloud import spanner
except ImportError:
spanner = None
AlreadyExists = None
# pylint: enable=wrong-import-order, wrong-import-position
class SpannerReadPerfTest(LoadTest):
def __init__(self):
super(SpannerReadPerfTest, self).__init__()
self.project = self.pipeline.get_option('project')
self.spanner_instance = self.pipeline.get_option('spanner_instance')
self.spanner_database = self.pipeline.get_option('spanner_database')
self._init_setup()
def _create_database(self):
spanner_client = spanner.Client()
instance = spanner_client.instance(self.spanner_instance)
database = instance.database(
self.spanner_database,
ddl_statements=[
"""CREATE TABLE test_data (
id STRING(99) NOT NULL,
data BYTES(MAX) NOT NULL
) PRIMARY KEY (id)""",
])
database.create()
def _init_setup(self):
"""Checks if a spanner database exists and creates it if not."""
try:
self._create_database()
self._create_input_data()
except AlreadyExists:
# pass if the database already exists
pass
def _create_input_data(self):
"""
Runs an additional pipeline which creates test data and waits for its
completion.
"""
def format_record(record):
import base64
return base64.b64encode(record[1])
def make_insert_mutations(element):
import uuid
from apache_beam.io.gcp.experimental.spannerio import WriteMutation
ins_mutation = WriteMutation.insert(
table='test_data',
columns=('id', 'data'),
values=[(str(uuid.uuid1()), element)])
return [ins_mutation]
with TestPipeline() as p:
( # pylint: disable=expression-not-assigned
p
| 'Produce rows' >> Read(
SyntheticSource(self.parse_synthetic_source_options()))
| 'Format' >> Map(format_record)
| 'Make mutations' >> FlatMap(make_insert_mutations)
| 'Write to Spanner' >> WriteToSpanner(
project_id=self.project,
instance_id=self.spanner_instance,
database_id=self.spanner_database,
max_batch_size_bytes=5120))
def test(self):
output = (
self.pipeline
| 'Read from Spanner' >> ReadFromSpanner(
self.project,
self.spanner_instance,
self.spanner_database,
sql="select data from test_data")
| 'Count messages' >> ParDo(CountMessages(self.metrics_namespace))
| 'Measure time' >> ParDo(MeasureTime(self.metrics_namespace))
| 'Count' >> Count.Globally())
assert_that(output, equal_to([self.input_options['num_records']]))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
SpannerReadPerfTest().run()