blob: 2dff7404f8fe7df7779a0b938e6051723288ee21 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
This is a load test that runs a set of basic microbenchmarks for the Python SDK
and the DirectRunner.
This test does not need any additional options passed to run, besides the
dataset information.
Example test run:
python -m apache_beam.testing.load_tests.microbenchmarks_test \
--test-pipeline-options="
--project=big-query-project
--input_options='{}'
--region=...
--publish_to_big_query=true
--metrics_dataset=python_load_tests
--metrics_table=microbenchmarks"
or:
./gradlew -PloadTest.args="
--publish_to_big_query=true
--project=...
--region=...
--input_options='{}'
--metrics_dataset=python_load_tests
--metrics_table=microbenchmarks
--runner=DirectRunner" \
-PloadTest.mainClass=apache_beam.testing.load_tests.microbenchmarks_test \
-Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
"""
# pytype: skip-file
import logging
import time
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.tools import fn_api_runner_microbenchmark
from apache_beam.tools import teststream_microbenchmark
from apache_beam.transforms.util import _BatchSizeEstimator
class MicroBenchmarksLoadTest(LoadTest):
def __init__(self):
super(MicroBenchmarksLoadTest, self).__init__()
def test(self):
self.extra_metrics.update(self._run_fn_api_runner_microbenchmark())
self.extra_metrics.update(self._run_teststream_microbenchmark())
def _run_teststream_microbenchmark(self):
start = time.perf_counter()
result = teststream_microbenchmark.run_benchmark(verbose=False)
sizes = list(result[0].values())[0]
costs = list(result[1].values())[0]
a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs)
return {
'teststream_microbenchmark_runtime_sec': time.perf_counter() - start,
'teststream_microbenchmark_fixed_cost_ms': a * 1000,
'teststream_microbenchmark_per_element_cost_ms': b * 1000,
}
def _run_fn_api_runner_microbenchmark(self):
start = time.perf_counter()
result = fn_api_runner_microbenchmark.run_benchmark(verbose=False)
sizes = list(result[0].values())[0]
costs = list(result[1].values())[0]
a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs)
return {
'fn_api_runner_microbenchmark_runtime_sec': time.perf_counter() - start,
'fn_api_runner_microbenchmark_fixed_cost_ms': a * 1000,
'fn_api_runner_microbenchmark_per_element_cost_ms': b * 1000,
}
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
MicroBenchmarksLoadTest().run()