layout: section title: “ParDo” permalink: /documentation/transforms/python/elementwise/pardo/ section_menu: section-menu/documentation.html

ParDo

{% include button-pydoc.md path=“apache_beam.transforms.core” class=“ParDo” %}

A transform for generic parallel processing. A ParDo transform considers each element in the input PCollection, performs some processing function (your user code) on that element, and emits zero or more elements to an output PCollection.

See more information in the [Beam Programming Guide]({{ site.baseurl }}/documentation/programming-guide/#pardo).

Examples

In the following examples, we explore how to create custom DoFns and access the timestamp and windowing information.

Example 1: ParDo with a simple DoFn

The following example defines a simple DoFn class called SplitWords which stores the delimiter as an object field. The process method is called once per element, and it can yield zero or more output elements.

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn %}```

{:.notebook-skip}
Output `PCollection` after `ParDo`:

{:.notebook-skip}

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:plants %}```

{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}

Example 2: ParDo with timestamp and window information

In this example, we add new parameters to the process method to bind parameter values at runtime.

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn_params %}```

{:.notebook-skip}
`stdout` output:

{:.notebook-skip}

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:dofn_params %}```

{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}

Example 3: ParDo with DoFn methods

A DoFn can be customized with a number of methods that can help create more complex behaviors. You can customize what a worker does when it starts and shuts down with setup and teardown. You can also customize what to do when a bundle of elements starts and finishes with start_bundle and finish_bundle.

  • DoFn.setup(): Called once per DoFn instance when the DoFn instance is initialized. setup need not to be cached, so it could be called more than once per worker. This is a good place to connect to database instances, open network connections or other resources.

  • DoFn.start_bundle(): Called once per bundle of elements before calling process on the first element of the bundle. This is a good place to start keeping track of the bundle elements.

  • DoFn.process(element, *args, **kwargs): Called once per element, can yield zero or more elements. Additional *args or **kwargs can be passed through beam.ParDo(). [required]

  • DoFn.finish_bundle(): Called once per bundle of elements after calling process after the last element of the bundle, can yield zero or more elements. This is a good place to do batch calls on a bundle of elements, such as running a database query.

    For example, you can initialize a batch in start_bundle, add elements to the batch in process instead of yielding them, then running a batch query on those elements on finish_bundle, and yielding all the results.

    Note that yielded elements from finish_bundle must be of the type apache_beam.utils.windowed_value.WindowedValue. You need to provide a timestamp as a unix timestamp, which you can get from the last processed element. You also need to provide a window, which you can get from the last processed element like in the example below.

  • DoFn.teardown(): Called once (as a best effort) per DoFn instance when the DoFn instance is shutting down. This is a good place to close database instances, close network connections or other resources.

    Note that teardown is called as a best effort and is not guaranteed. For example, if the worker crashes, teardown might not be called.

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py tag:pardo_dofn_methods %}```

{:.notebook-skip}
`stdout` output:

{:.notebook-skip}

{% github_sample /apache/beam/blob/master/sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo_test.py tag:results %}```

{% include buttons-code-snippet.md py=“sdks/python/apache_beam/examples/snippets/transforms/elementwise/pardo.py” notebook=“examples/notebooks/documentation/transforms/python/elementwise/pardo” %}

Known issues:

  • [BEAM-7885] DoFn.setup() doesn't run for streaming jobs running in the DirectRunner.
  • [BEAM-7340] DoFn.teardown() metrics are lost.

Related transforms

  • [Map]({{ site.baseurl }}/documentation/transforms/python/elementwise/map) behaves the same, but produces exactly one output for each input.
  • [FlatMap]({{ site.baseurl }}/documentation/transforms/python/elementwise/flatmap) behaves the same as Map, but for each input it may produce zero or more outputs.
  • [Filter]({{ site.baseurl }}/documentation/transforms/python/elementwise/filter) is useful if the function is just deciding whether to output an element or not.

{% include button-pydoc.md path=“apache_beam.transforms.core” class=“ParDo” %}