{{< localstorage language language-py >}}
{{< button-pydoc path=“apache_beam.transforms.core” class=“Pardo” >}}
A transform for generic parallel processing. A ParDo
transform considers each element in the input PCollection
, performs some processing function (your user code) on that element, and emits zero or more elements to an output PCollection
.
See more information in the Beam Programming Guide.
In the following examples, we explore how to create custom DoFn
s and access the timestamp and windowing information.
The following example defines a simple DoFn
class called SplitWords
which stores the delimiter
as an object field. The process
method is called once per element, and it can yield zero or more output elements.
{{< highlight py >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” pardo_dofn >}} {{</ highlight >}}
{{< paragraph class=“notebook-skip” >}} Output PCollection
after ParDo
: {{< /paragraph >}}
{{< highlight class=“notebook-skip” >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py” plants >}} {{< /highlight >}}
{{< buttons-code-snippet py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” >}}
In this example, we add new parameters to the process
method to bind parameter values at runtime.
beam.DoFn.TimestampParam
binds the timestamp information as an apache_beam.utils.timestamp.Timestamp
object.beam.DoFn.WindowParam
binds the window information as the appropriate apache_beam.transforms.window.*Window
object.{{< highlight py >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” pardo_dofn_params >}} {{</ highlight >}}
{{< paragraph class=“notebook-skip” >}} stdout
output: {{< /paragraph >}}
{{< highlight class=“notebook-skip” >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py” dofn_params >}} {{< /highlight >}}
{{< buttons-code-snippet py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” >}}
A DoFn
can be customized with a number of methods that can help create more complex behaviors. You can customize what a worker does when it starts and shuts down with setup
and teardown
. You can also customize what to do when a bundle of elements starts and finishes with start_bundle
and finish_bundle
.
DoFn.setup()
: Called once per DoFn
instance when the DoFn
instance is initialized. setup
need not to be cached, so it could be called more than once per worker. This is a good place to connect to database instances, open network connections or other resources.
DoFn.start_bundle()
: Called once per bundle of elements before calling process
on the first element of the bundle. This is a good place to start keeping track of the bundle elements.
DoFn.process(element, *args, **kwargs)
: Called once per element, can yield zero or more elements. Additional *args
or **kwargs
can be passed through beam.ParDo()
. [required]
DoFn.finish_bundle()
: Called once per bundle of elements after calling process
after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.
For example, you can initialize a batch in start_bundle
, add elements to the batch in process
instead of yielding them, then running a batch query on those elements on finish_bundle
, and yielding all the results.
Note that yielded elements from finish_bundle
must be of the type apache_beam.utils.windowed_value.WindowedValue
. You need to provide a timestamp as a unix timestamp, which you can get from the last processed element. You also need to provide a window, which you can get from the last processed element like in the example below.
DoFn.teardown()
: Called once (as a best effort) per DoFn
instance when the DoFn
instance is shutting down. This is a good place to close database instances, close network connections or other resources.
Note that teardown
is called as a best effort and is not guaranteed. For example, if the worker crashes, teardown
might not be called.
{{< highlight py >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” pardo_dofn_methods >}} {{</ highlight >}}
{{< paragraph class=“notebook-skip” >}} stdout
output: {{< /paragraph >}}
{{< highlight class=“notebook-skip” >}} {{< github_sample “/apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py” results >}} {{< /highlight >}}
{{< buttons-code-snippet py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” >}}
Known issues:
- [BEAM-7885]
DoFn.setup()
doesn't run for streaming jobs running in theDirectRunner
.- [BEAM-7340]
DoFn.teardown()
metrics are lost.
Map
, but for each input it may produce zero or more outputs.{{< button-pydoc path=“apache_beam.transforms.core” class=“Pardo” >}}