Prompt: What is a batched DoFn in Apache Beam?

Response: In Apache Beam, a batched DoFn is a specialized type of DoFn that operates on batches of logical elements within a PCollection. Unlike regular DoFn functions, which process each element individually, batched DoFn functions enable the efficient processing of multiple elements together as batches.

Batched DoFn functions are currently only available in the Apache Beam Python SDK. They leverage powerful Python vectorized libraries, such as NumPy, SciPy, and pandas, which are designed for analysis and computing at scale. Batched DoFn functions are particularly useful when dealing with large amounts of data, allowing for parallel and optimized processing. Additionally, you can chain multiple batched DoFn functions together to create a more efficient data processing pipeline.

To implement a batched DoFn, you define a process_batch method instead of the typical process method used in a regular DoFn. The process_batch method takes a batch of elements as input and produces a batch of elements as an output. It is important to note that batches must have a single set of timing properties (event time, window, etc.) for every logical element within the batch; batches cannot span multiple timestamps. For some DoFn functions, you can provide both a batched and an element-wise implementation of your desired logic by defining both process and process_batch methods.

Here is a simple example of a flexible DoFn with both batched and element-wise implementations:

class MultiplyByTen(beam.DoFn):
  def process(self, element: np.int64) -> Iterator[np.int64]:
    yield element * 10

  def process_batch(self, batch: np.ndarray) -> Iterator[np.ndarray]:
    yield batch 10

In this example, when MultiplyByTen is applied to a PCollection, Apache Beam recognizes that np.ndarray is an acceptable batch type to use in conjunction with np.int64 elements. The np.int64 type represents the individual element. The process method multiplies an element by ten, yielding a single element. The np.ndarray type represents the batch. The process_batch method multiplies each element in the batch by ten, yielding a single batch. During pipeline execution, Apache Beam will automatically select the best implementation based on the context.

By default, Apache Beam implicitly buffers elements and creates batches on the input side, then explodes batches back into individual elements on the output side. However, if batched DoFn functions with equivalent types are chained together, this batch creation and explosion process is skipped, and the batches are passed through for more efficient processing.

Here is an example with chained DoFn functions of equivalent types:

(p | beam.Create([1, 2, 3, 4]).with_output_types(np.int64)
   | beam.ParDo(MultiplyByTen()) # Implicit buffering and batch creation
   | beam.ParDo(MultiplyByTen()) # Batches passed through
   | beam.ParDo(MultiplyByTen()))

In this example, the PTransform.with_output_types method sets the element-wise typehint for the output. Thus, when the MultiplyByTen class is applied to a PCollection, Apache Beam recognizes that np.ndarray is an acceptable batch type to use in conjunction with np.int64 elements.

The process_batch method is expected to produce batched outputs, while the process method should produce individual elements. You can customize this production logic with the @beam.DoFn.yields_elements and @beam.DoFn.yields_batches decorators.

For more details, including advanced usage and code samples, you can refer to the Apache Beam documentation on batched DoFn types.