{% include button-pydoc.md path=“apache_beam.transforms.core” class=“ParDo” %}
A transform for generic parallel processing. A ParDo
transform considers each element in the input PCollection
, performs some processing function (your user code) on that element, and emits zero or more elements to an output PCollection
.
See more information in the [Beam Programming Guide]({{ site.baseurl }}/documentation/programming-guide/#pardo).
In the following examples, we explore how to create custom DoFn
s and access the timestamp and windowing information.
The following example defines a simple DoFn
class called SplitWords
which stores the delimiter
as an object field. The process
method is called once per element, and it can yield zero or more output elements.
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn %}``` {:.notebook-skip} Output `PCollection` after `ParDo`: {:.notebook-skip}
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:plants %}```
{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}
In this example, we add new parameters to the process
method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as an apache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriate apache_beam.transforms.window.*Window
object.{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn_params %}``` {:.notebook-skip} `stdout` output: {:.notebook-skip}
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:dofn_params %}```
{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}
A DoFn
can be customized with a number of methods that can help create more complex behaviors. You can customize what a worker does when it starts and shuts down with setup
and teardown
. You can also customize what to do when a bundle of elements starts and finishes with start_bundle
and finish_bundle
.
DoFn.setup()
: Called once per DoFn
instance when the DoFn
instance is initialized. setup
need not to be cached, so it could be called more than once per worker. This is a good place to connect to database instances, open network connections or other resources.
DoFn.start_bundle()
: Called once per bundle of elements before calling process
on the first element of the bundle. This is a good place to start keeping track of the bundle elements.
DoFn.process(element, *args, **kwargs)
: Called once per element, can yield zero or more elements. Additional *args
or **kwargs
can be passed through beam.ParDo()
. [required]
DoFn.finish_bundle()
: Called once per bundle of elements after calling process
after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.
For example, you can initialize a batch in start_bundle
, add elements to the batch in process
instead of yielding them, then running a batch query on those elements on finish_bundle
, and yielding all the results.
Note that yielded elements from finish_bundle
must be of the type apache_beam.utils.windowed_value.WindowedValue
. You need to provide a timestamp as a unix timestamp, which you can get from the last processed element. You also need to provide a window, which you can get from the last processed element like in the example below.
DoFn.teardown()
: Called once (as a best effort) per DoFn
instance when the DoFn
instance is shutting down. This is a good place to close database instances, close network connections or other resources.
Note that teardown
is called as a best effort and is not guaranteed. For example, if the worker crashes, teardown
might not be called.
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn_methods %}``` {:.notebook-skip} `stdout` output: {:.notebook-skip}
{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:results %}```
{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}
Known issues:
- [BEAM-7885]
DoFn.setup()
doesn't run for streaming jobs running in theDirectRunner
.- [BEAM-7340]
DoFn.teardown()
metrics are lost.
Map
, but for each input it may produce zero or more outputs.{% include button-pydoc.md path=“apache_beam.transforms.core” class=“ParDo” %}