layout: section title: “Direct Runner” permalink: /documentation/runners/direct/ section_menu: section-menu/runners.html redirect_from: /learn/runners/direct/

Using the Direct Runner

The Direct Runner executes pipelines on your machine and is designed to validate that pipelines adhere to the Apache Beam model as closely as possible. Instead of focusing on efficient pipeline execution, the Direct Runner performs additional checks to ensure that users do not rely on semantics that are not guaranteed by the model. Some of these checks include:

  • enforcing immutability of elements
  • enforcing encodability of elements
  • elements are processed in an arbitrary order at all points
  • serialization of user functions (DoFn, CombineFn, etc.)

Using the Direct Runner for testing and development helps ensure that pipelines are robust across different Beam runners. In addition, debugging failed runs can be a non-trivial task when a pipeline executes on a remote cluster. Instead, it is often faster and simpler to perform local unit testing on your pipeline code. Unit testing your pipeline locally also allows you to use your preferred local debugging tools.

Here are some resources with information about how to test your pipelines.

Direct Runner prerequisites and setup

Specify your dependency

When using Java, you must specify your dependency on the Direct Runner in your pom.xml.

<dependency>
   <groupId>org.apache.beam</groupId>
   <artifactId>beam-runners-direct-java</artifactId>
   <version>{{ site.release_latest }}</version>
   <scope>runtime</scope>
</dependency>

This section is not applicable to the Beam SDK for Python.

Pipeline options for the Direct Runner

When executing your pipeline from the command-line, set runner to direct or DirectRunner. The default values for the other pipeline options are generally sufficient.

See the reference documentation for the [DirectOptions](https://beam.apache.org/releases/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/runners/direct/DirectOptions.html) [DirectOptions](https://beam.apache.org/releases/pydoc/{{ site.release_latest }}/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.DirectOptions) interface for defaults and additional pipeline configuration options.

Additional information and caveats

Memory considerations

Local execution is limited by the memory available in your local environment. It is highly recommended that you run your pipeline with data sets small enough to fit in local memory. You can create a small in-memory data set using a [Create](https://beam.apache.org/releases/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/transforms/Create.html)Create transform, or you can use a [Read](https://beam.apache.org/releases/javadoc/{{ site.release_latest }}/index.html?org/apache/beam/sdk/io/Read.html)Read transform to work with small local or remote files.

Streaming execution

If your pipeline uses an unbounded data source or sink, you must set the streaming option to true.

Execution Mode

Python FnApiRunner supports multi-threading and multi-processing mode.

Setting parallelism

Number of threads or subprocesses is defined by setting the direct_num_workers option. There are several ways to set this option.

  • Passing through CLI when executing a pipeline.
python wordcount.py --input xx --output xx --direct_num_workers 2
  • Setting with PipelineOptions.
from apache_beam.options.pipeline_options import PipelineOptions
pipeline_options = PipelineOptions(['--direct_num_workers', '2'])
  • Adding to existing PipelineOptions.
from apache_beam.options.pipeline_options import DirectOptions
pipeline_options = PipelineOptions(xxx)
pipeline_options.view_as(DirectOptions).direct_num_workers = 2

Running with multi-threading mode

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.portability import fn_api_runner
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability import python_urns

parser = argparse.ArgumentParser()
parser.add_argument(...)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)

p = beam.Pipeline(options=pipeline_options,
      runner=fn_api_runner.FnApiRunner(
          default_environment=beam_runner_api_pb2.Environment(
          urn=python_urns.EMBEDDED_PYTHON_GRPC)))

Running with multi-processing mode

import argparse
import sys

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.portability import fn_api_runner
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.portability import python_urns

parser = argparse.ArgumentParser()
parser.add_argument(...)
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_options = PipelineOptions(pipeline_args)

p = beam.Pipeline(options=pipeline_options,
      runner=fn_api_runner.FnApiRunner(
          default_environment=beam_runner_api_pb2.Environment(
              urn=python_urns.SUBPROCESS_SDK,
              payload=b'%s -m apache_beam.runners.worker.sdk_worker_main'
                        % sys.executable.encode('ascii'))))