blob: 6ba01b9a517279627392959e322fd9fd50273fc2 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A microbenchmark for measuring the overhead of runtime_type_check vs
performance_runtime_type_check vs a pipeline with no runtime type check.
This runs a sequence of trivial DoFn's over a set of inputs to simulate
a real-world pipeline that processes lots of data.
Run as
python -m apache_beam.tools.runtime_type_check_microbenchmark
"""
# pytype: skip-file
import logging
from collections import defaultdict
from time import time
from typing import Iterable
from typing import Tuple
from typing import Union
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.tools import utils
@beam.typehints.with_input_types(Tuple[int, ...])
class SimpleInput(beam.DoFn):
def process(self, element, *args, **kwargs):
yield element
@beam.typehints.with_output_types(Tuple[int, ...])
class SimpleOutput(beam.DoFn):
def process(self, element, *args, **kwargs):
yield element
@beam.typehints.with_input_types(
Tuple[int, str, Tuple[float, ...], Iterable[int], Union[str, int]])
class NestedInput(beam.DoFn):
def process(self, element, *args, **kwargs):
yield element
@beam.typehints.with_output_types(
Tuple[int, str, Tuple[float, ...], Iterable[int], Union[str, int]])
class NestedOutput(beam.DoFn):
def process(self, element, *args, **kwargs):
yield element
def run_benchmark(
num_dofns=100, num_runs=10, num_elements_step=2000, num_for_averaging=4):
options_map = {
'No Type Check': PipelineOptions(),
'Runtime Type Check': PipelineOptions(runtime_type_check=True),
'Performance Runtime Type Check': PipelineOptions(
performance_runtime_type_check=True)
}
for run in range(num_runs):
num_elements = num_elements_step * run + 1
simple_elements = [
tuple(i for i in range(200)) for _ in range(num_elements)
]
nested_elements = [(
1,
'2',
tuple(float(i) for i in range(100)), [i for i in range(100)],
'5') for _ in range(num_elements)]
timings = defaultdict(lambda: defaultdict(lambda: defaultdict(float)))
# Do each run num_for_averaging times to get an average with reduced noise
for _ in range(num_for_averaging):
for option_name, options in options_map.items():
# Run a Pipeline using DoFn's with simple typehints
start = time()
with beam.Pipeline(options=options) as p:
pc = p | beam.Create(simple_elements)
for ix in range(num_dofns):
pc = (
pc | 'SimpleOutput %i' % ix >> beam.ParDo(SimpleOutput())
| 'SimpleInput %i' % ix >> beam.ParDo(SimpleInput()))
timings[num_elements]['Simple Types'][option_name] += time() - start
# Run a pipeline using DoFn's with nested typehints
start = time()
with beam.Pipeline(options=options) as p:
pc = p | beam.Create(nested_elements)
for ix in range(num_dofns):
pc = (
pc | 'NestedOutput %i' % ix >> beam.ParDo(NestedOutput())
| 'NestedInput %i' % ix >> beam.ParDo(NestedInput()))
timings[num_elements]['Nested Types'][option_name] += time() - start
for num_elements, element_type_map in timings.items():
print("%d Element%s" % (num_elements, " " if num_elements == 1 else "s"))
for element_type, option_name_map in element_type_map.items():
print("-- %s" % element_type)
for option_name, time_elapsed in option_name_map.items():
print(
"---- %.2f sec (%s)" %
(time_elapsed / num_for_averaging, option_name))
print('\n')
if __name__ == '__main__':
logging.basicConfig()
utils.check_compiled('apache_beam.runners.common')
run_benchmark()