blob: 674f4e48abc91e0ad7378381905b1c7c18ba4279 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Performance PubsubIO streaming test for Write/Read operations.
Caution: only test runners (e.g. TestDataflowRunner) support matchers
Example for TestDataflowRunner:
python -m apache_beam.io.gcp.pubsub_io_perf_test \
--test-pipeline-options="
--runner=TestDataflowRunner
--sdk_location=.../dist/apache-beam-x.x.x.dev0.tar.gz
--project=<GCP_PROJECT_ID>
--temp_location=gs://<BUCKET_NAME>/tmp
--staging_location=gs://<BUCKET_NAME>/staging
--wait_until_finish_duration=<TIME_IN_MS>
--pubsub_namespace_prefix=<PUBSUB_NAMESPACE_PREFIX>
--publish_to_big_query=<OPTIONAL><true/false>
--metrics_dataset=<OPTIONAL>
--metrics_table=<OPTIONAL>
--dataflow_worker_jar=<OPTIONAL>
--input_options='{
\"num_records\": <SIZE_OF_INPUT>
\"key_size\": 1
\"value_size\": <SIZE_OF_EACH_MESSAGE>
}'"
"""
# pytype: skip-file
import logging
import sys
from hamcrest import all_of
import apache_beam as beam
from apache_beam.io import Read
from apache_beam.io import ReadFromPubSub
from apache_beam.io.gcp.tests.pubsub_matcher import PubSubMessageMatcher
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.synthetic_pipeline import SyntheticSource
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import trigger
from apache_beam.transforms import window
# pylint: disable=wrong-import-order, wrong-import-position
try:
from google.cloud import pubsub
except ImportError:
pubsub = None
# pylint: enable=wrong-import-order, wrong-import-position
WRITE_METRICS_NAMESPACE = 'pubsub_io_perf_write'
READ_METRICS_NAMESPACE = 'pubsub_io_perf_read'
MATCHER_TIMEOUT = 60 * 15
MATCHER_PULL_TIMEOUT = 60 * 5
class PubsubIOPerfTest(LoadTest):
def _setup_env(self):
if not self.pipeline.get_option('pubsub_namespace_prefix'):
logging.error('--pubsub_namespace_prefix argument is required.')
sys.exit(1)
if not self.pipeline.get_option('wait_until_finish_duration'):
logging.error('--wait_until_finish_duration argument is required.')
sys.exit(1)
self.num_of_messages = int(self.input_options.get('num_records'))
pubsub_namespace_prefix = self.pipeline.get_option(
'pubsub_namespace_prefix')
self.pubsub_namespace = pubsub_namespace_prefix + unique_id
def _setup_pubsub(self):
self.pub_client = pubsub.PublisherClient()
self.topic_name = self.pub_client.topic_path(
self.project_id, self.pubsub_namespace)
self.matcher_topic_name = self.pub_client.topic_path(
self.project_id, self.pubsub_namespace + '_matcher')
self.sub_client = pubsub.SubscriberClient()
self.read_sub_name = self.sub_client.subscription_path(
self.project_id,
self.pubsub_namespace + '_read',
)
self.read_matcher_sub_name = self.sub_client.subscription_path(
self.project_id,
self.pubsub_namespace + '_read_matcher',
)
class PubsubWritePerfTest(PubsubIOPerfTest):
def __init__(self):
super(PubsubWritePerfTest, self).__init__(WRITE_METRICS_NAMESPACE)
self._setup_env()
self._setup_pubsub()
self._setup_pipeline()
def test(self):
def to_pubsub_message(element):
import uuid
from apache_beam.io import PubsubMessage
return PubsubMessage(
data=element[1],
attributes={'id': str(uuid.uuid1()).encode('utf-8')},
)
_ = (
self.pipeline
| 'Create input' >> Read(
SyntheticSource(self.parse_synthetic_source_options()))
| 'Format to pubsub message in bytes' >> beam.Map(to_pubsub_message)
| 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
| 'Write to Pubsub' >> beam.io.WriteToPubSub(
self.topic_name,
with_attributes=True,
id_label='id',
))
def _setup_pipeline(self):
options = PipelineOptions(self.pipeline.get_full_options_as_args())
options.view_as(SetupOptions).save_main_session = True
options.view_as(StandardOptions).streaming = True
self.pipeline = TestPipeline(options=options)
def _setup_pubsub(self):
super(PubsubWritePerfTest, self)._setup_pubsub()
_ = self.pub_client.create_topic(self.topic_name)
_ = self.sub_client.create_subscription(
self.read_sub_name,
self.topic_name,
)
class PubsubReadPerfTest(PubsubIOPerfTest):
def __init__(self):
super(PubsubReadPerfTest, self).__init__(READ_METRICS_NAMESPACE)
self._setup_env()
self._setup_pubsub()
self._setup_pipeline()
def test(self):
_ = (
self.pipeline
| 'Read from pubsub' >> ReadFromPubSub(
subscription=self.read_sub_name,
with_attributes=True,
id_label='id',
)
| beam.Map(lambda x: bytes(1)).with_output_types(bytes)
| 'Measure time' >> beam.ParDo(MeasureTime(self.metrics_namespace))
| 'Window' >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(
trigger.AfterCount(self.num_of_messages)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'Count messages' >> beam.CombineGlobally(
beam.combiners.CountCombineFn()).without_defaults().
with_output_types(int)
| 'Convert to bytes' >>
beam.Map(lambda count: str(count).encode('utf-8'))
| 'Write to Pubsub' >> beam.io.WriteToPubSub(self.matcher_topic_name))
def _setup_pubsub(self):
super(PubsubReadPerfTest, self)._setup_pubsub()
_ = self.pub_client.create_topic(self.matcher_topic_name)
_ = self.sub_client.create_subscription(
self.read_matcher_sub_name,
self.matcher_topic_name,
)
def _setup_pipeline(self):
pubsub_msg_verifier = PubSubMessageMatcher(
self.project_id,
self.read_matcher_sub_name,
expected_msg=[str(self.num_of_messages).encode('utf-8')],
timeout=MATCHER_TIMEOUT,
pull_timeout=MATCHER_PULL_TIMEOUT,
)
extra_opts = {
'on_success_matcher': all_of(pubsub_msg_verifier),
'streaming': True,
'save_main_session': True
}
args = self.pipeline.get_full_options_as_args(**extra_opts)
self.pipeline = TestPipeline(options=PipelineOptions(args))
def cleanup(self):
self.sub_client.delete_subscription(self.read_sub_name)
self.sub_client.delete_subscription(self.read_matcher_sub_name)
self.pub_client.delete_topic(self.topic_name)
self.pub_client.delete_topic(self.matcher_topic_name)
if __name__ == '__main__':
import uuid
unique_id = str(uuid.uuid4())
logging.basicConfig(level=logging.INFO)
PubsubWritePerfTest().run()
PubsubReadPerfTest().run()