blob: a5a0c25c4ef9cb5ce57776e3481d333ab11962a5 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A microbenchmark for measuring performance of coders.
This runs a sequence of encode-decode operations on random inputs
to collect performance of various coders.
To evaluate coders performance we approximate the behavior
how the coders are used in PCollections: we encode and decode
a list of elements. An element can be a string, a list of integers,
a windowed value, or any other object we want a coder to process.
Run as:
python -m apache_beam.tools.coders_microbenchmark
"""
# pytype: skip-file
import argparse
import logging
import random
import re
import string
import sys
import apache_beam as beam
from apache_beam.coders import proto2_coder_test_messages_pb2 as test_message
from apache_beam.coders import coder_impl
from apache_beam.coders import coders
from apache_beam.coders import row_coder
from apache_beam.coders import typecoders
from apache_beam.tools import utils
from apache_beam.transforms import window
from apache_beam.typehints import trivial_inference
from apache_beam.typehints.pandas_type_compatibility import DataFrameBatchConverterDropIndex
from apache_beam.utils import windowed_value
def coder_benchmark_factory(coder, generate_fn):
"""Creates a benchmark that encodes and decodes a list of elements.
Args:
coder: coder to use to encode an element.
generate_fn: a callable that generates an element.
"""
class CoderBenchmark(object):
def __init__(self, num_elements_per_benchmark):
self._coder = coders.IterableCoder(coder)
self._list = [generate_fn() for _ in range(num_elements_per_benchmark)]
def __call__(self):
# Calling coder operations on a single element at a time may incur
# unrelevant overhead. To compensate, we use a list elements.
_ = self._coder.decode(self._coder.encode(self._list))
CoderBenchmark.__name__ = "%s, %s" % (generate_fn.__name__, str(coder))
return CoderBenchmark
def batch_row_coder_benchmark_factory(generate_fn, use_batch):
"""Creates a benchmark that encodes and decodes a list of elements.
Args:
coder: coder to use to encode an element.
generate_fn: a callable that generates an element.
"""
class CoderBenchmark(object):
def __init__(self, num_elements_per_benchmark):
self._use_batch = use_batch
row_instance = generate_fn()
row_type = trivial_inference.instance_to_type(row_instance)
self._row_coder = get_row_coder(row_instance)
self._batch_converter = DataFrameBatchConverterDropIndex(row_type)
self._seq_coder = coders.IterableCoder(self._row_coder)
self._data = self._batch_converter.produce_batch(
[generate_fn() for _ in range(num_elements_per_benchmark)])
def __call__(self):
if self._use_batch:
impl = self._row_coder.get_impl()
columnar = {
col: self._data[col].to_numpy()
for col in self._data.columns
}
output_stream = coder_impl.create_OutputStream()
impl.encode_batch_to_stream(columnar, output_stream)
impl.decode_batch_from_stream(
columnar, coder_impl.create_InputStream(output_stream.get()))
else:
# Calling coder operations on a single element at a time may incur
# unrelevant overhead. To compensate, we use a list elements.
self._batch_converter.produce_batch(
self._seq_coder.decode(
self._seq_coder.encode(
self._batch_converter.explode_batch(self._data))))
CoderBenchmark.__name__ = "%s, BatchRowCoder%s" % (
generate_fn.__name__, use_batch)
return CoderBenchmark
def small_int():
return random.randint(0, 127)
def large_int():
return random.randint(sys.maxsize >> 2, sys.maxsize)
def random_string(length):
return ''.join(
random.choice(string.ascii_letters + string.digits)
for _ in range(length))
def small_string():
return random_string(4)
def large_string():
return random_string(100)
def list_int(size):
return [small_int() for _ in range(size)]
def dict_int_int(size):
return {i: i for i in list_int(size)}
def small_list():
return list_int(10)
def large_list():
# Bool is the last item in FastPrimitiveCoders before pickle.
return [bool(k) for k in list_int(1000)]
def small_tuple():
# Benchmark a common case of 2-element tuples.
return tuple(list_int(2))
def large_tuple():
return tuple(large_list())
def small_dict():
return {i: i for i in small_list()}
def large_dict():
return {i: i for i in large_list()}
def large_iterable():
yield 'a' * coders.coder_impl.SequenceCoderImpl._DEFAULT_BUFFER_SIZE
for k in range(1000):
yield k
def random_message_with_map(size):
message = test_message.MessageWithMap()
keys = list_int(size)
random.shuffle(keys)
for key in keys:
message.field1[str(key)].field1 = small_string()
return message
def small_message_with_map():
return random_message_with_map(5)
def large_message_with_map():
return random_message_with_map(20)
def globally_windowed_value():
return windowed_value.WindowedValue(
value=small_int(), timestamp=12345678, windows=(window.GlobalWindow(), ))
def random_windowed_value(num_windows):
return windowed_value.WindowedValue(
value=small_int(),
timestamp=12345678,
windows=tuple(
window.IntervalWindow(i * 10, i * 10 + small_int())
for i in range(num_windows)))
def wv_with_one_window():
return random_windowed_value(num_windows=1)
def wv_with_multiple_windows():
return random_windowed_value(num_windows=32)
def tiny_row():
return beam.Row(int_value=1)
def large_row():
return beam.Row(**{f'int_{ix}': ix for ix in range(20)})
def nullable_row():
return beam.Row(**{f'int_{ix}': ix if ix % 2 else None for ix in range(20)})
def diverse_row():
return beam.Row(
int_value=1,
float_value=3.14159,
str_value='beam',
row_value=beam.Row(int_value=2, float_value=2.718281828))
def get_row_coder(row_instance):
coder = typecoders.registry.get_coder(
trivial_inference.instance_to_type(row_instance))
assert isinstance(coder, row_coder.RowCoder)
return coder
def row_coder_benchmark_factory(generate_fn):
return coder_benchmark_factory(get_row_coder(generate_fn()), generate_fn)
def run_coder_benchmarks(
num_runs, input_size, seed, verbose, filter_regex='.*'):
random.seed(seed)
# TODO(https://github.com/apache/beam/issues/18788): Pick coders using type
# hints, for example:
# tuple_coder = typecoders.registry.get_coder(typing.Tuple[int, ...])
benchmarks = [
coder_benchmark_factory(coders.FastPrimitivesCoder(), small_int),
coder_benchmark_factory(coders.FastPrimitivesCoder(), large_int),
coder_benchmark_factory(coders.FastPrimitivesCoder(), small_string),
coder_benchmark_factory(coders.FastPrimitivesCoder(), large_string),
coder_benchmark_factory(coders.FastPrimitivesCoder(), small_list),
coder_benchmark_factory(
coders.IterableCoder(coders.FastPrimitivesCoder()), small_list),
coder_benchmark_factory(coders.FastPrimitivesCoder(), large_list),
coder_benchmark_factory(
coders.IterableCoder(coders.FastPrimitivesCoder()), large_list),
coder_benchmark_factory(
coders.IterableCoder(coders.FastPrimitivesCoder()), large_iterable),
coder_benchmark_factory(coders.FastPrimitivesCoder(), small_tuple),
coder_benchmark_factory(coders.FastPrimitivesCoder(), large_tuple),
coder_benchmark_factory(coders.FastPrimitivesCoder(), small_dict),
coder_benchmark_factory(coders.FastPrimitivesCoder(), large_dict),
coder_benchmark_factory(
coders.ProtoCoder(test_message.MessageWithMap),
small_message_with_map),
coder_benchmark_factory(
coders.ProtoCoder(test_message.MessageWithMap),
large_message_with_map),
coder_benchmark_factory(
coders.DeterministicProtoCoder(test_message.MessageWithMap),
small_message_with_map),
coder_benchmark_factory(
coders.DeterministicProtoCoder(test_message.MessageWithMap),
large_message_with_map),
coder_benchmark_factory(
coders.WindowedValueCoder(coders.FastPrimitivesCoder()),
wv_with_one_window),
coder_benchmark_factory(
coders.WindowedValueCoder(
coders.FastPrimitivesCoder(), coders.IntervalWindowCoder()),
wv_with_multiple_windows),
coder_benchmark_factory(
coders.WindowedValueCoder(
coders.FastPrimitivesCoder(), coders.GlobalWindowCoder()),
globally_windowed_value),
coder_benchmark_factory(
coders.LengthPrefixCoder(coders.FastPrimitivesCoder()), small_int),
row_coder_benchmark_factory(tiny_row),
row_coder_benchmark_factory(large_row),
row_coder_benchmark_factory(nullable_row),
row_coder_benchmark_factory(diverse_row),
batch_row_coder_benchmark_factory(tiny_row, False),
batch_row_coder_benchmark_factory(tiny_row, True),
batch_row_coder_benchmark_factory(large_row, False),
batch_row_coder_benchmark_factory(large_row, True),
batch_row_coder_benchmark_factory(nullable_row, False),
batch_row_coder_benchmark_factory(nullable_row, True),
batch_row_coder_benchmark_factory(diverse_row, False),
batch_row_coder_benchmark_factory(diverse_row, True),
]
suite = [
utils.BenchmarkConfig(b, input_size, num_runs) for b in benchmarks
if re.search(filter_regex, b.__name__, flags=re.I)
]
utils.run_benchmarks(suite, verbose=verbose)
if __name__ == "__main__":
logging.basicConfig()
parser = argparse.ArgumentParser()
parser.add_argument('--filter', default='.*')
parser.add_argument('--num_runs', default=20, type=int)
parser.add_argument('--num_elements_per_benchmark', default=1000, type=int)
parser.add_argument('--seed', default=42, type=int)
options = parser.parse_args()
utils.check_compiled("apache_beam.coders.coder_impl")
num_runs = 20
num_elements_per_benchmark = 1000
seed = 42 # Fix the seed for better consistency
run_coder_benchmarks(
options.num_runs,
options.num_elements_per_benchmark,
options.seed,
verbose=True,
filter_regex=options.filter)