blob: 1b86cdaae56183429c560a59df8506a3c401ce16 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
"""Worker utilities for representing MapTasks.
Each MapTask represents a sequence of ParallelInstruction(s): read from a
source, write to a sink, parallel do, etc.
# pytype: skip-file
import collections
from apache_beam import coders
def build_worker_instruction(*args):
"""Create an object representing a ParallelInstruction protobuf.
This will be a collections.namedtuple with a custom __str__ method.
Alas, this wrapper is not known to pylint, which thinks it creates
constants. You may have to put a disable=invalid-name pylint
annotation on any use of this, depending on your names.
*args: first argument is the name of the type to create. Should
start with "Worker". Second arguments is alist of the
attributes of this object.
A new class, a subclass of tuple, that represents the protobuf.
tuple_class = collections.namedtuple(*args)
tuple_class.__str__ = worker_object_to_string
tuple_class.__repr__ = worker_object_to_string
return tuple_class
def worker_printable_fields(workerproto):
"""Returns the interesting fields of a Worker* object."""
return [
'%s=%s' % (name, value)
# _asdict is the only way and cannot subclass this generated class
# pylint: disable=protected-access
for name,
value in workerproto._asdict().items()
# want to output value 0 but not None nor []
if (value or value == 0) and name not in (
def worker_object_to_string(worker_object):
"""Returns a string compactly representing a Worker* object."""
return '%s(%s)' % (
', '.join(worker_printable_fields(worker_object)))
# All the following Worker* definitions will have these lint problems:
# pylint: disable=invalid-name
# pylint: disable=pointless-string-statement
WorkerRead = build_worker_instruction('WorkerRead', ['source', 'output_coders'])
"""Worker details needed to read from a source.
source: a source object.
output_coders: 1-tuple of the coder for the output.
WorkerSideInputSource = build_worker_instruction(
'WorkerSideInputSource', ['source', 'tag'])
"""Worker details needed to read from a side input source.
source: a source object.
tag: string tag for this side input.
WorkerGroupingShuffleRead = build_worker_instruction(
"""Worker details needed to read from a grouping shuffle source.
start_shuffle_position: An opaque string to be passed to the shuffle
source to indicate where to start reading.
end_shuffle_position: An opaque string to be passed to the shuffle
source to indicate where to stop reading.
shuffle_reader_config: An opaque string used to initialize the shuffle
reader. Contains things like connection endpoints for the shuffle
server appliance and various options.
coder: The KV coder used to decode shuffle entries.
output_coders: 1-tuple of the coder for the output.
WorkerUngroupedShuffleRead = build_worker_instruction(
"""Worker details needed to read from an ungrouped shuffle source.
start_shuffle_position: An opaque string to be passed to the shuffle
source to indicate where to start reading.
end_shuffle_position: An opaque string to be passed to the shuffle
source to indicate where to stop reading.
shuffle_reader_config: An opaque string used to initialize the shuffle
reader. Contains things like connection endpoints for the shuffle
server appliance and various options.
coder: The value coder used to decode shuffle entries.
WorkerWrite = build_worker_instruction(
'WorkerWrite', ['sink', 'input', 'output_coders'])
"""Worker details needed to write to a sink.
sink: a sink object.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
output_coders: 1-tuple, coder to use to estimate bytes written.
WorkerInMemoryWrite = build_worker_instruction(
['output_buffer', 'write_windowed_values', 'input', 'output_coders'])
"""Worker details needed to write to a in-memory sink.
Used only for unit testing. It makes worker tests less cluttered with code like
"write to a file and then check file contents".
output_buffer: list to which output elements will be appended
write_windowed_values: whether to record the entire WindowedValue outputs,
or just the raw (unwindowed) value
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
output_coders: 1-tuple, coder to use to estimate bytes written.
WorkerShuffleWrite = build_worker_instruction(
['shuffle_kind', 'shuffle_writer_config', 'input', 'output_coders'])
"""Worker details needed to write to a shuffle sink.
shuffle_kind: A string describing the shuffle kind. This can control the
way the worker interacts with the shuffle sink. The possible values are:
'ungrouped', 'group_keys', and 'group_keys_and_sort_values'.
shuffle_writer_config: An opaque string used to initialize the shuffle
write. Contains things like connection endpoints for the shuffle
server appliance and various options.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
output_coders: 1-tuple of the coder for input elements. If the
shuffle_kind is grouping, this is expected to be a KV coder.
WorkerDoFn = build_worker_instruction(
['serialized_fn', 'output_tags', 'input', 'side_inputs', 'output_coders'])
"""Worker details needed to run a DoFn.
serialized_fn: A serialized DoFn object to be run for each input element.
output_tags: The string tags used to identify the outputs of a ParDo
operation. The tag is present even if the ParDo has just one output
(e.g., ['out'].
output_coders: array of coders, one for each output.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
side_inputs: A list of Worker...Read instances describing sources to be
used for getting values. The types supported right now are
WorkerInMemoryRead and WorkerTextRead.
WorkerReifyTimestampAndWindows = build_worker_instruction(
'WorkerReifyTimestampAndWindows', ['output_tags', 'input', 'output_coders'])
"""Worker details needed to run a WindowInto.
output_tags: The string tags used to identify the outputs of a ParDo
operation. The tag is present even if the ParDo has just one output
(e.g., ['out'].
output_coders: array of coders, one for each output.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
WorkerMergeWindows = build_worker_instruction(
"""Worker details needed to run a MergeWindows (aka. GroupAlsoByWindows).
window_fn: A serialized Windowing object representing the windowing strategy.
combine_fn: A serialized CombineFn object to be used after executing the
GroupAlsoByWindows operation. May be None if not a combining operation.
phase: Possible values are 'all', 'add', 'merge', and 'extract'.
A runner optimizer may split the user combiner in 3 separate
phases (ADD, MERGE, and EXTRACT), on separate VMs, as it sees
fit. The phase attribute dictates which DoFn is actually running in
the worker. May be None if not a combining operation.
output_tags: The string tags used to identify the outputs of a ParDo
operation. The tag is present even if the ParDo has just one output
(e.g., ['out'].
output_coders: array of coders, one for each output.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
coders: A 2-tuple of coders (key, value) to encode shuffle entries.
context: The ExecutionContext object for the current work item.
WorkerCombineFn = build_worker_instruction(
'WorkerCombineFn', ['serialized_fn', 'phase', 'input', 'output_coders'])
"""Worker details needed to run a CombineFn.
serialized_fn: A serialized CombineFn object to be used.
phase: Possible values are 'all', 'add', 'merge', and 'extract'.
A runner optimizer may split the user combiner in 3 separate
phases (ADD, MERGE, and EXTRACT), on separate VMs, as it sees
fit. The phase attribute dictates which DoFn is actually running in
the worker.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
output_coders: 1-tuple of the coder for the output.
WorkerPartialGroupByKey = build_worker_instruction(
'WorkerPartialGroupByKey', ['combine_fn', 'input', 'output_coders'])
"""Worker details needed to run a partial group-by-key.
combine_fn: A serialized CombineFn object to be used.
input: A (producer index, output index) tuple representing the
ParallelInstruction operation whose output feeds into this operation.
The output index is 0 except for multi-output operations (like ParDo).
output_coders: 1-tuple of the coder for the output.
WorkerFlatten = build_worker_instruction(
'WorkerFlatten', ['inputs', 'output_coders'])
"""Worker details needed to run a Flatten.
inputs: A list of tuples, each (producer index, output index), representing
the ParallelInstruction operations whose output feeds into this operation.
The output index is 0 unless the input is from a multi-output
operation (such as ParDo).
output_coders: 1-tuple of the coder for the output.
def get_coder_from_spec(coder_spec):
"""Return a coder instance from a coder spec.
coder_spec: A dict where the value of the '@type' key is a pickled instance
of a Coder instance.
A coder instance (has encode/decode methods).
assert coder_spec is not None
# Ignore the wrappers in these encodings.
ignored_wrappers = (
if coder_spec['@type'] in ignored_wrappers:
assert len(coder_spec['component_encodings']) == 1
coder_spec = coder_spec['component_encodings'][0]
return get_coder_from_spec(coder_spec)
# Handle a few well known types of coders.
if coder_spec['@type'] == 'kind:pair':
assert len(coder_spec['component_encodings']) == 2
component_coders = [
get_coder_from_spec(c) for c in coder_spec['component_encodings']
return coders.TupleCoder(component_coders)
elif coder_spec['@type'] == 'kind:stream':
assert len(coder_spec['component_encodings']) == 1
return coders.IterableCoder(
elif coder_spec['@type'] == 'kind:windowed_value':
assert len(coder_spec['component_encodings']) == 2
value_coder, window_coder = [
get_coder_from_spec(c) for c in coder_spec['component_encodings']]
return coders.coders.WindowedValueCoder(
value_coder, window_coder=window_coder)
elif coder_spec['@type'] == 'kind:interval_window':
assert (
'component_encodings' not in coder_spec or
not coder_spec['component_encodings'])
return coders.coders.IntervalWindowCoder()
elif coder_spec['@type'] == 'kind:global_window':
assert (
'component_encodings' not in coder_spec or
not coder_spec['component_encodings'])
return coders.coders.GlobalWindowCoder()
elif coder_spec['@type'] == 'kind:varint':
assert (
'component_encodings' not in coder_spec or
len(coder_spec['component_encodings'] == 0))
return coders.coders.VarIntCoder()
elif coder_spec['@type'] == 'kind:length_prefix':
assert len(coder_spec['component_encodings']) == 1
return coders.coders.LengthPrefixCoder(
elif coder_spec['@type'] == 'kind:bytes':
assert (
'component_encodings' not in coder_spec or
len(coder_spec['component_encodings'] == 0))
return coders.BytesCoder()
# We pass coders in the form "<coder_name>$<pickled_data>" to make the job
# description JSON more readable.
return coders.coders.deserialize_coder(coder_spec['@type'].encode('ascii'))