blob: 2387be79fd18ee853e9e0ea4a135626e29d22496 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""A set of utilities to write pipelines for performance tests.
This module offers a way to create pipelines using synthetic sources and steps.
Exact shape of the pipeline and the behaviour of sources and steps can be
controlled through arguments. Please see function 'parse_args()' for more
details about the arguments.
Shape of the pipeline is primariy controlled through two arguments. Argument
'steps' can be used to define a list of steps as a JSON string. Argument
'barrier' describes how these steps are separated from each other. Argument
'barrier' can be use to build a pipeline as a a series of steps or a tree of
steps with a fanin or a fanout of size 2.
Other arguments describe what gets generated by synthetic sources that produce
data for the pipeline.
"""
# pytype: skip-file
import argparse
import json
import logging
import math
import os
import sys
import time
from random import Random
from typing import Tuple
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import typehints
from apache_beam.io import WriteToText
from apache_beam.io import iobase
from apache_beam.io import range_trackers
from apache_beam.io import restriction_trackers
from apache_beam.io.restriction_trackers import OffsetRange
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.transforms import userstate
from apache_beam.transforms.core import RestrictionProvider
try:
import numpy as np
except ImportError:
np = None
class _Random(Random):
"""A subclass of `random.Random` from the Python Standard Library that
provides a method returning random bytes of arbitrary length.
"""
# `numpy.random.RandomState` does not provide `random()` method, we keep this
# for compatibility reasons.
random_sample = Random.random
def bytes(self, length):
"""Returns random bytes.
Args:
length (int): Number of random bytes.
"""
return self.getrandbits(length * 8).to_bytes(length, sys.byteorder)
Generator = _Random
def parse_byte_size(s):
suffixes = 'BKMGTP'
if s[-1] in suffixes:
return int(float(s[:-1]) * 1024**suffixes.index(s[-1]))
return int(s)
def div_round_up(a, b):
"""Return ceil(a/b)."""
return int(math.ceil(float(a) / b))
def rotate_key(element):
"""Returns a new key-value pair of the same size but with a different key."""
(key, value) = element
return key[-1:] + key[:-1], value
def initial_splitting_zipf(
start_position,
stop_position,
desired_num_bundles,
distribution_parameter,
num_total_records=None):
"""Split the given range (defined by start_position, stop_position) into
desired_num_bundles using zipf with the given distribution_parameter.
"""
if not num_total_records:
num_total_records = stop_position - start_position
samples = np.random.zipf(distribution_parameter, desired_num_bundles)
total = sum(samples)
relative_bundle_sizes = [(float(sample) / total) for sample in samples]
bundle_ranges = []
start = start_position
index = 0
while start < stop_position:
if index == desired_num_bundles - 1:
bundle_ranges.append((start, stop_position))
break
stop = start + int(num_total_records * relative_bundle_sizes[index])
bundle_ranges.append((start, stop))
start = stop
index += 1
return bundle_ranges
class SyntheticStep(beam.DoFn):
"""A DoFn of which behavior can be controlled through prespecified parameters.
"""
def __init__(
self,
per_element_delay_sec=0,
per_bundle_delay_sec=0,
output_records_per_input_record=1,
output_filter_ratio=0):
if per_element_delay_sec and per_element_delay_sec < 1e-3:
raise ValueError(
'Per element sleep time must be at least 1e-3. '
'Received: %r',
per_element_delay_sec)
self._per_element_delay_sec = per_element_delay_sec
self._per_bundle_delay_sec = per_bundle_delay_sec
self._output_records_per_input_record = output_records_per_input_record
self._output_filter_ratio = output_filter_ratio
def start_bundle(self):
self._start_time = time.time()
def finish_bundle(self):
# The target is for the enclosing stage to take as close to as possible
# the given number of seconds, so we only sleep enough to make up for
# overheads not incurred elsewhere.
to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
# Ignoring sub-millisecond sleep times.
if to_sleep >= 1e-3:
time.sleep(to_sleep)
def process(self, element):
if self._per_element_delay_sec >= 1e-3:
time.sleep(self._per_element_delay_sec)
filter_element = False
if self._output_filter_ratio > 0:
if np.random.random() < self._output_filter_ratio:
filter_element = True
if not filter_element:
for _ in range(self._output_records_per_input_record):
yield element
class NonLiquidShardingOffsetRangeTracker(OffsetRestrictionTracker):
"""An OffsetRangeTracker that doesn't allow splitting. """
def try_split(self, split_offset):
pass # Don't split.
def checkpoint(self):
pass # Don't split.
class SyntheticSDFStepRestrictionProvider(RestrictionProvider):
"""A `RestrictionProvider` for SyntheticSDFStep.
An initial_restriction and split that operate on num_records and ignores
source description (element). Splits into initial_splitting_num_bundles.
Returns size_estimate_override as restriction size, if set. Otherwise uses
element size.
If initial_splitting_uneven_chunks, produces uneven chunks.
"""
def __init__(
self,
num_records,
initial_splitting_num_bundles,
initial_splitting_uneven_chunks,
disable_liquid_sharding,
size_estimate_override):
self._num_records = num_records
self._initial_splitting_num_bundles = initial_splitting_num_bundles
self._initial_splitting_uneven_chunks = initial_splitting_uneven_chunks
self._disable_liquid_sharding = disable_liquid_sharding
self._size_estimate_override = size_estimate_override
def initial_restriction(self, element):
return OffsetRange(0, self._num_records)
def create_tracker(self, restriction):
if self._disable_liquid_sharding:
return NonLiquidShardingOffsetRangeTracker(restriction)
else:
return OffsetRestrictionTracker(restriction)
def split(self, element, restriction):
elems = restriction.size()
if (self._initial_splitting_uneven_chunks and
self._initial_splitting_num_bundles > 1 and elems > 1):
bundle_ranges = initial_splitting_zipf(
restriction.start,
restriction.stop,
self._initial_splitting_num_bundles,
3.0)
for start, stop in bundle_ranges:
yield OffsetRange(start, stop)
else:
offsets_per_split = max(1, (elems // self._initial_splitting_num_bundles))
for split in restriction.split(offsets_per_split, offsets_per_split // 2):
yield split
def restriction_size(self, element, restriction):
if self._size_estimate_override is not None:
return self._size_estimate_override
element_size = len(element) if isinstance(element, str) else 1
return restriction.size() * element_size
def get_synthetic_sdf_step(
per_element_delay_sec=0,
per_bundle_delay_sec=0,
output_records_per_input_record=1,
output_filter_ratio=0,
initial_splitting_num_bundles=8,
initial_splitting_uneven_chunks=False,
disable_liquid_sharding=False,
size_estimate_override=None,
):
"""A function which returns a SyntheticSDFStep with given parameters. """
class SyntheticSDFStep(beam.DoFn):
"""A SplittableDoFn of which behavior can be controlled through prespecified
parameters.
"""
def __init__(
self,
per_element_delay_sec_arg,
per_bundle_delay_sec_arg,
output_filter_ratio_arg,
output_records_per_input_record_arg):
if per_element_delay_sec_arg:
per_element_delay_sec_arg = (
per_element_delay_sec_arg // output_records_per_input_record_arg)
if per_element_delay_sec_arg < 1e-3:
raise ValueError(
'Per element sleep time must be at least 1e-3 after being '
'divided among output elements.')
self._per_element_delay_sec = per_element_delay_sec_arg
self._per_bundle_delay_sec = per_bundle_delay_sec_arg
self._output_filter_ratio = output_filter_ratio_arg
def start_bundle(self):
self._start_time = time.time()
def finish_bundle(self):
# The target is for the enclosing stage to take as close to as possible
# the given number of seconds, so we only sleep enough to make up for
# overheads not incurred elsewhere.
to_sleep = self._per_bundle_delay_sec - (time.time() - self._start_time)
# Ignoring sub-millisecond sleep times.
if to_sleep >= 1e-3:
time.sleep(to_sleep)
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
SyntheticSDFStepRestrictionProvider(
output_records_per_input_record,
initial_splitting_num_bundles,
initial_splitting_uneven_chunks,
disable_liquid_sharding,
size_estimate_override))):
filter_element = False
if self._output_filter_ratio > 0:
if np.random.random() < self._output_filter_ratio:
filter_element = True
current_restriction = restriction_tracker.current_restriction()
for cur in range(current_restriction.start, current_restriction.stop):
if not restriction_tracker.try_claim(cur):
return
if self._per_element_delay_sec:
time.sleep(self._per_element_delay_sec)
if not filter_element:
yield element
cur += 1
return SyntheticSDFStep(
per_element_delay_sec,
per_bundle_delay_sec,
output_filter_ratio,
output_records_per_input_record)
class SyntheticSource(iobase.BoundedSource):
"""A custom source of a specified size.
"""
def __init__(self, input_spec):
"""Initiates a synthetic source.
Args:
input_spec: Input specification of the source. See corresponding option in
function 'parse_args()' below for more details.
Raises:
ValueError: if input parameters are invalid.
"""
def maybe_parse_byte_size(s):
return parse_byte_size(s) if isinstance(s, str) else int(s)
self._num_records = input_spec['numRecords']
self._key_size = maybe_parse_byte_size(input_spec.get('keySizeBytes', 1))
self._hot_key_fraction = input_spec.get('hotKeyFraction', 0)
self._num_hot_keys = input_spec.get('numHotKeys', 0)
self._value_size = maybe_parse_byte_size(
input_spec.get('valueSizeBytes', 1))
self._total_size = self.element_size * self._num_records
self._initial_splitting = (
input_spec['bundleSizeDistribution']['type']
if 'bundleSizeDistribution' in input_spec else 'const')
if self._initial_splitting != 'const' and self._initial_splitting != 'zipf':
raise ValueError(
'Only const and zipf distributions are supported for determining '
'sizes of bundles produced by initial splitting. Received: %s',
self._initial_splitting)
self._initial_splitting_num_bundles = (
input_spec['forceNumInitialBundles']
if 'forceNumInitialBundles' in input_spec else 0)
if self._initial_splitting == 'zipf':
self._initial_splitting_distribution_parameter = (
input_spec['bundleSizeDistribution']['param'])
if self._initial_splitting_distribution_parameter < 1:
raise ValueError(
'Parameter for a Zipf distribution must be larger than 1. '
'Received %r.',
self._initial_splitting_distribution_parameter)
else:
self._initial_splitting_distribution_parameter = 0
self._dynamic_splitting = (
'none' if (
'splitPointFrequencyRecords' in input_spec and
input_spec['splitPointFrequencyRecords'] == 0) else 'perfect')
if 'delayDistribution' in input_spec:
if input_spec['delayDistribution']['type'] != 'const':
raise ValueError(
'SyntheticSource currently only supports delay '
'distributions of type \'const\'. Received %s.',
input_spec['delayDistribution']['type'])
self._sleep_per_input_record_sec = (
float(input_spec['delayDistribution']['const']) / 1000)
if (self._sleep_per_input_record_sec and
self._sleep_per_input_record_sec < 1e-3):
raise ValueError(
'Sleep time per input record must be at least 1e-3.'
' Received: %r',
self._sleep_per_input_record_sec)
else:
self._sleep_per_input_record_sec = 0
@property
def element_size(self):
return self._key_size + self._value_size
def estimate_size(self):
return self._total_size
def split(self, desired_bundle_size, start_position=0, stop_position=None):
# Performs initial splitting of SyntheticSource.
#
# Exact sizes and distribution of initial splits generated here depends on
# the input specification of the SyntheticSource.
if stop_position is None:
stop_position = self._num_records
if self._initial_splitting == 'zipf':
desired_num_bundles = self._initial_splitting_num_bundles or math.ceil(
float(self.estimate_size()) / desired_bundle_size)
bundle_ranges = initial_splitting_zipf(
start_position,
stop_position,
desired_num_bundles,
self._initial_splitting_distribution_parameter,
self._num_records)
else:
if self._initial_splitting_num_bundles:
bundle_size_in_elements = max(
1, int(self._num_records / self._initial_splitting_num_bundles))
else:
bundle_size_in_elements = (
max(
div_round_up(desired_bundle_size, self.element_size),
int(math.floor(math.sqrt(self._num_records)))))
bundle_ranges = []
for start in range(start_position, stop_position,
bundle_size_in_elements):
stop = min(start + bundle_size_in_elements, stop_position)
bundle_ranges.append((start, stop))
for start, stop in bundle_ranges:
yield iobase.SourceBundle(stop - start, self, start, stop)
def get_range_tracker(self, start_position, stop_position):
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = self._num_records
tracker = range_trackers.OffsetRangeTracker(start_position, stop_position)
if self._dynamic_splitting == 'none':
tracker = range_trackers.UnsplittableRangeTracker(tracker)
return tracker
def _gen_kv_pair(self, generator, index):
generator.seed(index)
rand = generator.random_sample()
# Determines whether to generate hot key or not.
if rand < self._hot_key_fraction:
# Generate hot key.
# An integer is randomly selected from the range [0, numHotKeys-1]
# with equal probability.
generator_hot = Generator(index % self._num_hot_keys)
bytes_ = generator_hot.bytes(self._key_size), generator.bytes(
self._value_size)
else:
bytes_ = generator.bytes(self.element_size)
bytes_ = bytes_[:self._key_size], bytes_[self._key_size:]
return bytes_
def read(self, range_tracker):
index = range_tracker.start_position()
generator = Generator()
while range_tracker.try_claim(index):
time.sleep(self._sleep_per_input_record_sec)
yield self._gen_kv_pair(generator, index)
index += 1
def default_output_coder(self):
return beam.coders.TupleCoder(
[beam.coders.BytesCoder(), beam.coders.BytesCoder()])
class SyntheticSDFSourceRestrictionProvider(RestrictionProvider):
"""A `RestrictionProvider` for SyntheticSDFAsSource.
In initial_restriction(element) and split(element), element means source
description.
A typical element is like:
{
'key_size': 1,
'value_size': 1,
'initial_splitting_num_bundles': 8,
'initial_splitting_desired_bundle_size': 2,
'sleep_per_input_record_sec': 0,
'initial_splitting' : 'const'
}
"""
def initial_restriction(self, element):
return OffsetRange(0, element['num_records'])
def create_tracker(self, restriction):
return restriction_trackers.OffsetRestrictionTracker(restriction)
def split(self, element, restriction):
bundle_ranges = []
start_position = restriction.start
stop_position = restriction.stop
element_size = element['key_size'] + element['value_size']
estimate_size = element_size * element['num_records']
if element['initial_splitting'] == 'zipf':
desired_num_bundles = (
element['initial_splitting_num_bundles'] or div_round_up(
estimate_size, element['initial_splitting_desired_bundle_size']))
samples = np.random.zipf(
element['initial_splitting_distribution_parameter'],
desired_num_bundles)
total = sum(samples)
relative_bundle_sizes = [(float(sample) / total) for sample in samples]
start = start_position
index = 0
while start < stop_position:
if index == desired_num_bundles - 1:
bundle_ranges.append(OffsetRange(start, stop_position))
break
stop = start + int(
element['num_records'] * relative_bundle_sizes[index])
bundle_ranges.append(OffsetRange(start, stop))
start = stop
index += 1
else:
if element['initial_splitting_num_bundles']:
bundle_size_in_elements = max(
1,
int(
element['num_records'] /
element['initial_splitting_num_bundles']))
else:
bundle_size_in_elements = (
max(
div_round_up(
element['initial_splitting_desired_bundle_size'],
element_size),
int(math.floor(math.sqrt(element['num_records'])))))
for start in range(start_position, stop_position,
bundle_size_in_elements):
stop = min(start + bundle_size_in_elements, stop_position)
bundle_ranges.append(OffsetRange(start, stop))
return bundle_ranges
def restriction_size(self, element, restriction):
return (element['key_size'] + element['value_size']) * restriction.size()
class SyntheticSDFAsSource(beam.DoFn):
"""A SDF that generates records like a source.
This SDF accepts a PCollection of record-based source description.
A typical description is like:
{
'key_size': 1,
'value_size': 1,
'initial_splitting_num_bundles': 8,
'initial_splitting_desired_bundle_size': 2,
'sleep_per_input_record_sec': 0,
'initial_splitting' : 'const'
}
A simple pipeline taking this SDF as a source is like:
p
| beam.Create([description1, description2,...])
| beam.ParDo(SyntheticSDFAsSource())
NOTE:
The SDF.process() will have different param content between defining a DoFn
and runtime.
When defining an SDF.process, the restriction_tracker should be a
`RestrictionProvider`.
During runtime, the DoFnRunner.process_with_sized_restriction() will feed
a 'RestrictionTracker' based on a restriction to SDF.process().
"""
def process(
self,
element,
restriction_tracker=beam.DoFn.RestrictionParam(
SyntheticSDFSourceRestrictionProvider())):
cur = restriction_tracker.current_restriction().start
while restriction_tracker.try_claim(cur):
r = Generator()
r.seed(cur)
time.sleep(element['sleep_per_input_record_sec'])
yield r.bytes(element['key_size']), r.bytes(element['value_size'])
cur += 1
class ShuffleBarrier(beam.PTransform):
def expand(self, pc):
return (
pc
| beam.Map(rotate_key)
| beam.GroupByKey()
| 'Ungroup' >> beam.FlatMap(lambda elm: [(elm[0], v) for v in elm[1]]))
class SideInputBarrier(beam.PTransform):
def expand(self, pc):
return (
pc
| beam.Map(rotate_key)
| beam.Map(
lambda elem,
ignored: elem,
beam.pvalue.AsIter(pc | beam.FlatMap(lambda elem: None))))
def merge_using_gbk(name, pc1, pc2):
"""Merges two given PCollections using a CoGroupByKey."""
pc1_with_key = pc1 | (name + 'AttachKey1') >> beam.Map(lambda x: (x, x))
pc2_with_key = pc2 | (name + 'AttachKey2') >> beam.Map(lambda x: (x, x))
grouped = ({
'pc1': pc1_with_key, 'pc2': pc2_with_key
} | (name + 'Group') >> beam.CoGroupByKey())
return (
grouped | (name + 'DeDup') >> beam.Map(lambda elm: elm[0])
) # Ignoring values
def merge_using_side_input(name, pc1, pc2):
"""Merges two given PCollections using side inputs."""
def join_fn(val, _): # Ignoring side input
return val
return pc1 | name >> beam.core.Map(join_fn, beam.pvalue.AsIter(pc2))
def expand_using_gbk(name, pc):
"""Expands a given PCollection into two copies using GroupByKey."""
ret = []
ret.append((pc | ('%s.a' % name) >> ShuffleBarrier()))
ret.append((pc | ('%s.b' % name) >> ShuffleBarrier()))
return ret
def expand_using_second_output(name, pc):
"""Expands a given PCollection into two copies using side outputs."""
class ExpandFn(beam.DoFn):
def process(self, element):
yield beam.pvalue.TaggedOutput('second_out', element)
yield element
pc1, pc2 = (pc | name >> beam.ParDo(
ExpandFn()).with_outputs('second_out', main='main_out'))
return [pc1, pc2]
def _parse_steps(json_str):
"""Converts the JSON step description into Python objects.
See property 'steps' for more details about the JSON step description.
Args:
json_str: a JSON string that describes the steps.
Returns:
Information about steps as a list of dictionaries. Each dictionary may have
following properties.
(1) per_element_delay - amount of delay for each element in seconds.
(2) per_bundle_delay - minimum amount of delay for a given step in seconds.
(3) output_records_per_input_record - number of output elements generated
for each input element to a step.
(4) output_filter_ratio - the probability at which a step may filter out a
given element by not producing any output for that element.
(5) splittable - if the step should be splittable.
(6) initial_splitting_num_bundles - number of bundles initial split if step
is splittable.
(7) initial_splitting_uneven_chunks - if the bundles should be
unevenly-sized
(8) disable_liquid_sharding - if liquid sharding should be disabled
(9) size_estimate_override - the size estimate or None to use default
"""
all_steps = []
json_data = json.loads(json_str)
for val in json_data:
steps = {}
steps['per_element_delay'] = ((float(val['per_element_delay_msec']) / 1000)
if 'per_element_delay_msec' in val else 0)
steps['per_bundle_delay'] = (
float(val['per_bundle_delay_sec'])
if 'per_bundle_delay_sec' in val else 0)
steps['output_records_per_input_record'] = (
int(val['output_records_per_input_record'])
if 'output_records_per_input_record' in val else 1)
steps['output_filter_ratio'] = (
float(val['output_filter_ratio'])
if 'output_filter_ratio' in val else 0)
steps['splittable'] = (
bool(val['splittable']) if 'splittable' in val else False)
steps['initial_splitting_num_bundles'] = (
int(val['initial_splitting_num_bundles'])
if 'initial_splitting_num_bundles' in val else 8)
steps['initial_splitting_uneven_chunks'] = (
bool(val['initial_splitting_uneven_chunks'])
if 'initial_splitting_uneven_chunks' in val else False)
steps['disable_liquid_sharding'] = (
bool(val['disable_liquid_sharding'])
if 'disable_liquid_sharding' in val else False)
steps['size_estimate_override'] = (
int(val['size_estimate_override'])
if 'size_estimate_override' in val else None)
all_steps.append(steps)
return all_steps
def parse_args(args):
"""Parses a given set of arguments.
Args:
args: set of arguments to be passed.
Returns:
a tuple where first item gives the set of arguments defined and parsed
within this method and second item gives the set of unknown arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument(
'--steps',
dest='steps',
type=_parse_steps,
help='A JSON string that gives a list where each entry of the list is '
'configuration information for a step. Configuration for each step '
'consists of '
'(1) A float "per_bundle_delay_sec" (in seconds). Defaults to 0.'
'(2) A float "per_element_delay_msec" (in milli seconds). '
' Defaults to 0.'
'(3) An integer "output_records_per_input_record". Defaults to 1.'
'(4) A float "output_filter_ratio" in the range [0, 1] . '
' Defaults to 0.'
'(5) A bool "splittable" that defaults to false.'
'(6) An integer "initial_splitting_num_bundles". Defaults to 8.')
parser.add_argument(
'--input',
dest='input',
type=json.loads,
help='A JSON string that describes the properties of the SyntheticSource '
'used by the pipeline. Configuration is similar to Java '
'SyntheticBoundedInput.'
'Currently supports following properties. '
'(1) An integer "numRecords". '
'(2) An integer "keySize". '
'(3) An integer "valueSize". '
'(4) A tuple "bundleSizeDistribution" with following values. '
' A string "type". Allowed values are "const" and "zipf". '
' An float "param". Only used if "type"=="zipf". Must be '
' larger than 1. '
'(5) An integer "forceNumInitialBundles". '
'(6) An integer "splitPointFrequencyRecords". '
'(7) A tuple "delayDistribution" with following values. '
' A string "type". Only allowed value is "const". '
' An integer "const". ')
parser.add_argument(
'--barrier',
dest='barrier',
default='shuffle',
choices=[
'shuffle',
'side-input',
'expand-gbk',
'expand-second-output',
'merge-gbk',
'merge-side-input'
],
help='Whether to use shuffle as the barrier '
'(as opposed to side inputs).')
parser.add_argument(
'--output',
dest='output',
default='',
help='Destination to write output.')
return parser.parse_known_args(args)
def run(argv=None, save_main_session=True):
"""Runs the workflow."""
known_args, pipeline_args = parse_args(argv)
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
input_info = known_args.input
with TestPipeline(options=pipeline_options) as p:
source = SyntheticSource(input_info)
# pylint: disable=expression-not-assigned
barrier = known_args.barrier
pc_list = []
num_roots = 2**(len(known_args.steps) - 1) if (
barrier == 'merge-gbk' or barrier == 'merge-side-input') else 1
for read_no in range(num_roots):
pc_list.append((p | ('Read %d' % read_no) >> beam.io.Read(source)))
for step_no, steps in enumerate(known_args.steps):
if step_no != 0:
new_pc_list = []
for pc_no, pc in enumerate(pc_list):
if barrier == 'shuffle':
new_pc_list.append(
(pc | ('shuffle %d.%d' % (step_no, pc_no)) >> ShuffleBarrier()))
elif barrier == 'side-input':
new_pc_list.append((
pc | ('side-input %d.%d' %
(step_no, pc_no)) >> SideInputBarrier()))
elif barrier == 'expand-gbk':
new_pc_list.extend(
expand_using_gbk(('expand-gbk %d.%d' % (step_no, pc_no)), pc))
elif barrier == 'expand-second-output':
new_pc_list.extend(
expand_using_second_output(
('expand-second-output %d.%d' % (step_no, pc_no)), pc))
elif barrier == 'merge-gbk':
if pc_no % 2 == 0:
new_pc_list.append(
merge_using_gbk(('merge-gbk %d.%d' % (step_no, pc_no)),
pc,
pc_list[pc_no + 1]))
else:
continue
elif barrier == 'merge-side-input':
if pc_no % 2 == 0:
new_pc_list.append(
merge_using_side_input(
('merge-side-input %d.%d' % (step_no, pc_no)),
pc,
pc_list[pc_no + 1]))
else:
continue
pc_list = new_pc_list
new_pc_list = []
for pc_no, pc in enumerate(pc_list):
if steps['splittable']:
step = get_synthetic_sdf_step(
per_element_delay_sec=steps['per_element_delay'],
per_bundle_delay_sec=steps['per_bundle_delay'],
output_records_per_input_record=steps[
'output_records_per_input_record'],
output_filter_ratio=steps['output_filter_ratio'],
initial_splitting_num_bundles=steps[
'initial_splitting_num_bundles'],
initial_splitting_uneven_chunks=steps[
'initial_splitting_uneven_chunks'],
disable_liquid_sharding=steps['disable_liquid_sharding'],
size_estimate_override=steps['size_estimate_override'])
else:
step = SyntheticStep(
per_element_delay_sec=steps['per_element_delay'],
per_bundle_delay_sec=steps['per_bundle_delay'],
output_records_per_input_record=steps[
'output_records_per_input_record'],
output_filter_ratio=steps['output_filter_ratio'])
new_pc = pc | 'SyntheticStep %d.%d' % (step_no,
pc_no) >> beam.ParDo(step)
new_pc_list.append(new_pc)
pc_list = new_pc_list
if known_args.output:
# If an output location is provided we format and write output.
if len(pc_list) == 1:
(
pc_list[0]
| 'FormatOutput' >> beam.Map(lambda elm: (elm[0] + elm[1]))
| 'WriteOutput' >> WriteToText(known_args.output))
logging.info('Pipeline run completed.')
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
class StatefulLoadGenerator(beam.PTransform):
"""A PTransform for generating random data using Timers API."""
def __init__(self, input_options, num_keys=100):
self.num_records = input_options['num_records']
self.key_size = input_options['key_size']
self.value_size = input_options['value_size']
self.num_keys = num_keys
@typehints.with_output_types(Tuple[bytes, bytes])
class GenerateKeys(beam.DoFn):
def __init__(self, num_keys, key_size):
self.num_keys = num_keys
self.key_size = key_size
def process(self, impulse):
for _ in range(self.num_keys):
key = os.urandom(self.key_size)
yield key, b''
class GenerateLoad(beam.DoFn):
state_spec = userstate.CombiningValueStateSpec(
'bundles_remaining', combine_fn=sum)
timer_spec = userstate.TimerSpec('timer', userstate.TimeDomain.WATERMARK)
def __init__(self, num_records_per_key, value_size, bundle_size=1000):
self.num_records_per_key = num_records_per_key
self.payload = os.urandom(value_size)
self.bundle_size = bundle_size
self.time_fn = time.time
def process(
self,
_element,
records_remaining=beam.DoFn.StateParam(state_spec),
timer=beam.DoFn.TimerParam(timer_spec)):
records_remaining.add(self.num_records_per_key)
timer.set(0)
@userstate.on_timer(timer_spec)
def process_timer(
self,
key=beam.DoFn.KeyParam,
records_remaining=beam.DoFn.StateParam(state_spec),
timer=beam.DoFn.TimerParam(timer_spec)):
cur_bundle_size = min(self.bundle_size, records_remaining.read())
for _ in range(cur_bundle_size):
records_remaining.add(-1)
yield key, self.payload
if records_remaining.read() > 0:
timer.set(0)
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin), (
'Input to transform must be a PBegin but found %s' % pbegin)
return (
pbegin
| 'Impulse' >> beam.Impulse()
| 'GenerateKeys' >> beam.ParDo(
StatefulLoadGenerator.GenerateKeys(self.num_keys, self.key_size))
| 'GenerateLoad' >> beam.ParDo(
StatefulLoadGenerator.GenerateLoad(
self.num_records // self.num_keys, self.value_size)))