blob: 840286553dd58bd859f676de5cb396c9e6fe3213 [file] [view]
Prompt:
What are additional outputs in Apache Beam?
Response:
In Apache Beam, additional outputs, also known as tagged or side outputs, refer to one or multiple extra [`PCollection`](https://beam.apache.org/documentation/basics/#pcollection) objects produced by a single [`ParDo`](https://beam.apache.org/documentation/programming-guide/#pardo) transform in addition to the main output `PCollection`. The `ParDo` transform has the flexibility to produce any number of additional output `PCollection` objects and return them bundled together with the main output `PCollection`.
Additional outputs serve as a mechanism to implement [pipeline branching](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections). You can use them when there is a need to split the output of a single transform into several `PCollection` objects or produce outputs in different formats. Additional outputs become particularly beneficial when a transform’s computation per element is time-consuming because they enable transforms to process each element in the input `PCollection` just once.
Producing additional outputs requires [tagging](https://beam.apache.org/documentation/programming-guide/#output-tags) each output `PCollection` with a unique identifier, which is then used to [emit](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn) elements to the corresponding output.
In the Apache Beam Java SDK, you can implement additional outputs by creating a `TupleTag` object to identify each collection produced by the `ParDo` transform. After specifying the `TupleTag` objects for each of the outputs, the tags are passed to the `ParDo` using the `.withOutputTags` method. You can find a sample Apache Beam Java pipeline that applies one transform to output two `PCollection` objects in the [Branching `PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#a-single-transform-that-produces-multiple-outputs) section in the Apache Beam documentation.
The following Java code implements two additional output `PCollection` objects for string and integer values in addition to the main output `PCollection` of strings:
```java
// Input PCollection that contains strings.
PCollection<String> input = ...;
// Output tag for the main output PCollection of strings.
final TupleTag<String> mainOutputTag = new TupleTag<String>(){};
// Output tag for the additional output PCollection of strings.
final TupleTag<String> additionalOutputTagString = new TupleTag<Integer>(){};
// Output tag for the additional output PCollection of integers.
final TupleTag<Integer> additionalOutputTagIntegers = new TupleTag<Integer>(){};
PCollectionTuple results = input.apply(ParDo
.of(new DoFn<String, String>() {
// DoFn continues here.
...
})
// Specify the tag for the main output.
.withOutputTags(mainOutputTag,
// Specify the tags for the two additional outputs as a TupleTagList.
TupleTagList.of(additionalOutputTagString)
.and(additionalOutputTagIntegers)));
```
The `processElement` method can emit elements to the main output or any additional output by invoking the output method on the `MultiOutputReceiver` object. The output method takes the tag of the output and the element to be emitted as arguments.
```java
public void processElement(@Element String word, MultiOutputReceiver out) {
if (condition for main output) {
// Emit element to main output
out.get(mainOutputTag).output(word);
} else {
// Emit element to additional string output
out.get(additionalOutputTagString).output(word);
}
if (condition for additional integer output) {
// Emit element to additional integer output
out.get(additionalOutputTagIntegers).output(word.length());
}
}
```
In the Apache Beam Python SDK, you can implement additional outputs by invoking the `with_outputs()` method on the `ParDo` and specifying the expected tags for the multiple outputs.
The following Python code demonstrates how to implement additional outputs for a `ParDo` transform that outputs two `PCollection` objects of strings and integers in addition to the main output `PCollection` of strings:
```python
class SplitLinesToWordsFn(beam.DoFn):
# These tags will be used to tag the outputs of this DoFn.
OUTPUT_TAG_SHORT_WORDS = 'tag_short_words'
OUTPUT_TAG_CHARACTER_COUNT = 'tag_character_count'
def process(self, element):
# yield a count (integer) to the OUTPUT_TAG_CHARACTER_COUNT tagged collection.
yield pvalue.TaggedOutput(self.OUTPUT_TAG_CHARACTER_COUNT, len(element))
words = re.findall(r'[A-Za-z\']+', element)
for word in words:
if len(word) <= 3:
# yield word as an output to the OUTPUT_TAG_SHORT_WORDS tagged collection.
yield pvalue.TaggedOutput(self.OUTPUT_TAG_SHORT_WORDS, word)
else:
# yield word to add it to the main collection.
yield word
```
The method returns a `DoOutputsTuple` object, with the specified tags serving as attributes that provide `ParDo` with access to the corresponding output `PCollection` objects.
```python
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)
# with_outputs allows accessing the explicitly tagged outputs of a DoFn.
split_lines_result = (
lines
| beam.ParDo(SplitLinesToWordsFn()).with_outputs(
SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS,
SplitLinesToWordsFn.OUTPUT_TAG_CHARACTER_COUNT,
main='words'))
# split_lines_result is an object of type DoOutputsTuple
words, _, _ = split_lines_result
short_words = split_lines_result[SplitLinesToWordsFn.OUTPUT_TAG_SHORT_WORDS]
character_count = split_lines_result.tag_character_count
```
Refer to the [multiple output `ParDo`](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/multiple_output_pardo.py) for the complete example code.
For more information about additional outputs and sample pipelines demonstrating their use, you can refer to the Apache Beam documentation on:
* [Tagging multiple outputs](https://beam.apache.org/documentation/programming-guide/#output-tags)
* [Emitting elements to multiple outputs](https://beam.apache.org/documentation/programming-guide/#multiple-outputs-dofn)
* [Accessing additional parameters in your `DoFn`](https://beam.apache.org/documentation/programming-guide/#other-dofn-parameters)
* [Branching `PCollection`s](https://beam.apache.org/documentation/pipelines/design-your-pipeline/#branching-pcollections)