blob: 69ea74c8c262e5f6b7210d40923d75e5f426aea5 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This is GroupByKey load test with Synthetic Source. Besides of the standard
input options there are additional options:
* fanout (optional) - number of GBK operations to run in parallel
* iterations (optional) - number of reiteraations over per-key-grouped
values to perform
* project (optional) - the gcp project in case of saving
metrics in Big Query (in case of Dataflow Runner
it is required to specify project of runner),
* publish_to_big_query - if metrics should be published in big query,
* metrics_namespace (optional) - name of BigQuery dataset where metrics
will be stored,
* metrics_table (optional) - name of BigQuery table where metrics
will be stored,
* input_options - options for Synthetic Sources.
Example test run:
python -m apache_beam.testing.load_tests.group_by_key_test \
--test-pipeline-options="
--project=big-query-project
--region=...
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=gbk
--fanout=1
--iterations=1
--input_options='{
\"num_records\": 300,
\"key_size\": 5,
\"value_size\": 15
}'"
or:
./gradlew -PloadTest.args="
--publish_to_big_query=true
--project=...
--region=...
--metrics_dataset=python_load_tests
--metrics_table=gbk
--fanout=1
--iterations=1
--input_options='{
\"num_records\": 1,
\"key_size\": 1,
\"value_size\": 1}'
--runner=DirectRunner" \
-PloadTest.mainClass=apache_beam.testing.load_tests.group_by_key_test \
-Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
"""
# pytype: skip-file
import logging
import apache_beam as beam
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.testing.load_tests.load_test_metrics_utils import AssignTimestamps
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureLatency
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.synthetic_pipeline import SyntheticSource
class GroupByKeyTest(LoadTest):
def __init__(self):
super(GroupByKeyTest, self).__init__()
self.fanout = self.get_option_or_default('fanout', 1)
self.iterations = self.get_option_or_default('iterations', 1)
@staticmethod
def ungroup_and_reiterate(element, iterations):
key, value = element
for i in range(iterations):
for v in value:
if i == iterations - 1:
return key, v
def test(self):
pc = (
self.pipeline
| beam.io.Read(SyntheticSource(self.parse_synthetic_source_options()))
| 'Measure time: Start' >> beam.ParDo(
MeasureTime(self.metrics_namespace))
| 'Assign timestamps' >> beam.ParDo(AssignTimestamps()))
for branch in range(self.fanout):
( # pylint: disable=expression-not-assigned
pc
| 'GroupByKey %i' % branch >> beam.GroupByKey()
| 'Ungroup %i' % branch >> beam.Map(self.ungroup_and_reiterate,
self.iterations)
| 'Measure latency %i' % branch >> beam.ParDo(
MeasureLatency(self.metrics_namespace))
| 'Measure time: End %i' % branch >> beam.ParDo(
MeasureTime(self.metrics_namespace)))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
GroupByKeyTest().run()