blob: 987c1ad3bc11ef3b878e07dfd43cf6bd4ecbca1e [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
Load test for operations involving side inputs.
The purpose of this test is to measure the cost of materialization and
accessing side inputs. The test uses synthetic source which can be
parametrized to generate records with various sizes of keys and values,
impose delays in the pipeline and simulate other performance challenges.
This test can accept the following parameters:
* side_input_type (str) - Required. Specifies how the side input will be
materialized in ParDo operation. Choose from (dict, iter, list).
* window_count (int) - The number of fixed sized windows to subdivide the
side input into. By default, no windows will be used.
* side_input_size (int) - The size of the side input. Must be equal to or
lower than the size of the main input. If lower, the side input will be
created by applying a :class:`beam.combiners.Sample
<apache_beam.transforms.combiners.Sample>` transform.
* access_percentage (int) - Specifies the percentage of elements in the side
input to be accessed. By default, all elements will be accessed.
Example test run:
python -m apache_beam.testing.load_tests.sideinput_test \
--test-pipeline-options="
--side_input_type=iter
--input_options='{
\"num_records\": 300,
\"key_size\": 5,
\"value_size\": 15
}'"
or:
./gradlew -PloadTest.args="
--side_input_type=iter
--input_options='{
\"num_records\": 300,
\"key_size\": 5,
\"value_size\": 15}'" \
-PloadTest.mainClass=apache_beam.testing.load_tests.sideinput_test \
-Prunner=DirectRunner :sdks:python:apache_beam:testing:load_tests:run
"""
# pytype: skip-file
from __future__ import absolute_import
from __future__ import division
import logging
import apache_beam as beam
from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.testing.load_tests.load_test_metrics_utils import MeasureTime
from apache_beam.testing.synthetic_pipeline import SyntheticSource
class SideInputTest(LoadTest):
SIDE_INPUT_TYPES = {
'iter': beam.pvalue.AsIter,
'list': beam.pvalue.AsList,
'dict': beam.pvalue.AsDict,
}
def __init__(self):
super(SideInputTest, self).__init__()
self.windows = self.get_option_or_default('window_count', default=0)
self.access_percentage = self.get_option_or_default(
'access_percentage', default=100)
if self.access_percentage < 0 or self.access_percentage > 100:
raise ValueError(
'access_percentage: Invalid value. Should be in range '
'from 0 to 100, got {} instead'.format(self.access_percentage))
self.side_input_size = self.get_option_or_default(
'side_input_size', default=0)
if self.side_input_size == 0:
self.side_input_size = self.input_options.get('num_records')
self.side_input_type = self.pipeline.get_option('side_input_type')
if self.side_input_type is None:
raise ValueError(
'side_input_type is required. Please provide one of '
'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys())))
def materialize_as(self):
try:
return self.SIDE_INPUT_TYPES[self.side_input_type]
except KeyError:
raise ValueError(
'Unknown side input type. Please provide one of '
'these: {}'.format(list(self.SIDE_INPUT_TYPES.keys())))
def test(self):
class SequenceSideInputTestDoFn(beam.DoFn):
"""Iterate over first n side_input elements."""
def __init__(self, first_n):
self._first_n = first_n
def process(self, unused_element, side_input):
i = 0
it = iter(side_input)
while i < self._first_n:
i += 1
try:
# No-op. We only make sure that the element is accessed.
next(it)
except StopIteration:
return
class MappingSideInputTestDoFn(beam.DoFn):
"""Take a sequence of keys as an additional side input and for each
key in the sequence checks the value for key in the dictionary."""
def process(self, unused_element, dict_side_input, keys_to_check):
for key in keys_to_check:
# No-op. We only make sure that the element is accessed.
dict_side_input[key]
class GetRandomKeys(beam.DoFn):
def __init__(self, n):
self._n = n
def process(self, unused_element, dict_side_input):
import random
n = min(self._n, len(dict_side_input))
return random.sample(dict_side_input.keys(), n)
class AddEventTimestamps(beam.DoFn):
"""Assign timestamp to each element of PCollection."""
def setup(self):
self._timestamp = 0
def process(self, element):
from apache_beam.transforms.combiners import window
yield window.TimestampedValue(element, self._timestamp)
self._timestamp += 1
input_pc = (
self.pipeline
| 'Read synthetic' >> beam.io.Read(
SyntheticSource(self.parse_synthetic_source_options()))
| 'Collect start time metrics' >> beam.ParDo(
MeasureTime(self.metrics_namespace)))
if self.side_input_size != self.input_options.get('num_records'):
side_input = (
input_pc
| 'Sample {} elements'.format(self.side_input_size) >>
beam.combiners.Sample.FixedSizeGlobally(self.side_input_size)
| 'Flatten a sequence' >> beam.FlatMap(lambda x: x))
else:
side_input = input_pc
if self.windows > 0:
window_size = self.side_input_size / self.windows
logging.info('Fixed windows of %s seconds will be applied', window_size)
side_input = (
side_input
| 'Add event timestamps' >> beam.ParDo(AddEventTimestamps())
| 'Apply windows' >> beam.WindowInto(
beam.combiners.window.FixedWindows(window_size)))
side_input_type = self.materialize_as()
elements_to_access = self.side_input_size * self.access_percentage // 100
logging.info(
'%s out of %s total elements in the side input will be '
'accessed.',
elements_to_access,
self.side_input_size)
if side_input_type is beam.pvalue.AsDict:
random_keys = (
self.pipeline
| beam.Impulse()
| 'Get random keys' >> beam.ParDo(
GetRandomKeys(elements_to_access), beam.pvalue.AsDict(side_input))
)
pc = input_pc | beam.ParDo(
MappingSideInputTestDoFn(),
side_input_type(side_input),
beam.pvalue.AsList(random_keys))
else:
pc = input_pc | beam.ParDo(
SequenceSideInputTestDoFn(elements_to_access),
side_input_type(side_input))
_ = pc | 'Collect end time metrics' >> beam.ParDo(
MeasureTime(self.metrics_namespace))
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
SideInputTest().run()