Prompt: What are additional outputs in Apache Beam?
Response: In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra PCollection objects produced by a single ParDo transform in addition to the main output PCollection. The ParDo transform has the flexibility to produce any number of additional output PCollection objects and return them bundled together with the main output PCollection.
Additional outputs serve as a mechanism to implement pipeline branching. You can use them when there is a need to split the output of a single transform into several PCollection objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input PCollection just once.
Producing additional outputs requires tagging each output PCollection with a unique identifier, which is then used to emit elements to the corresponding output.
In the Apache Beam Java SDK, you can implement additional outputs by creating a TupleTag object to identify each collection produced by the ParDo transform. After specifying the TupleTag objects for each of the outputs, the tags are passed to the ParDo using the .withOutputTags method.
The following Java code implements two additional output PCollection objects for string and integer values in addition to the main output PCollection of strings:
// Input PCollection that contains strings. PCollection<String> input = ...; // Output tag for the main output PCollection of strings. final TupleTag<String> mainOutputTag = new TupleTag<String>() {}; // Output tag for the additional output PCollection of strings. final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>() {}; // Output tag for the additional output PCollection of integers. final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>() {}; PCollectionTuple results = input.apply(ParDo .of(new DoFn<String, String>() { // DoFn continues here. ... }) // Specify the tag for the main output. .withOutputTags(mainOutputTag, // Specify the tags for the two additional outputs as a TupleTagList. TupleTagList.of(additionalOutputTagString) .and(additionalOutputTagIntegers)));
The processElement method can emit elements to the main output or any additional output by invoking the output method on the MultiOutputReceiver object. The output method takes the tag of the output and the element to be emitted as arguments.
public void processElement(@Element String word, MultiOutputReceiver out) { if (condition for main output) { // Emit element to main output out.get(mainOutputTag).output(word); } else { // Emit element to additional string output out.get(additionalOutputTagString).output(word); } if (condition for additional integer output) { // Emit element to additional integer output out.get(additionalOutputTagIntegers).output(word.length()); } }
In the Apache Beam Python SDK, you can implement additional outputs by invoking the with_outputs() method on the ParDo transform and specifying the expected tags for the multiple outputs.
The following Python code demonstrates how to implement additional outputs for a ParDo transform that outputs two PCollection objects of strings and integers in addition to the main output PCollection of strings:
class SplitLinesToWordsFn(beam.DoFn): # These tags will be used to tag the outputs of this DoFn. OUTPUT_TAG_SHORT_WORDS = "tag_short_words" OUTPUT_TAG_CHARACTER_COUNT = "tag_character_count" def process(self, element): # yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged collection. yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element)) words = re.findall(r"[A-Za-z\']+", element) for word in words: if len(word) <= 3: # yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged collection. yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word) else: # yield word to add it to the main collection. yield word
The method returns a DoOutputsTuple object, with the specified tags serving as attributes that provide ParDo with access to the corresponding output PCollection objects.
with beam.Pipeline(options=pipeline_options) as p: lines = p | ReadFromText(known_args.input) # with_outputs allows accessing the explicitly tagged outputs of a DoFn. split_lines_result = lines | beam.ParDo(SplitLinesToWordsFn()).with_outputs( SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS, SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT, main="words", ) # split_lines_result is an object of type DoOutputsTuple words, _, _ = split_lines_result short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS] character_count = split_lines_result.tag_character_count
Refer to the multiple output ParDo example in the Apache Beam GitHub for the complete example code.