tree: e25c1fdd67117a7f057941d7ac4064de54245209 [path history] [tgz]
  1. display/
  2. examples/
  3. __init__.py
  4. cache_manager.py
  5. cache_manager_test.py
  6. interactive_runner.py
  7. interactive_runner_test.py
  8. pipeline_analyzer.py
  9. pipeline_analyzer_test.py
  10. README.md
sdks/python/apache_beam/runners/interactive/README.md

Interactive Beam

Overview

Interactive Beam is aimed at integrating Apache Beam with Jupyter notebook to make pipeline prototyping and data exploration much faster and easier. It provides nice features including

  1. Graphical representation

    When a pipeline is executed on a Jupyter notebook, it instantly displays the pipeline as a directed acyclic graph. Sampled PCollection results will be added to the graph as the pipeline execution proceeds.

  2. Fetching PCollections as list

    PCollections can be fetched as a list from the pipeline result. This unique feature of InteractiveRunner makes it much easier to integrate Beam pipeline into data analysis.

    p = beam.Pipeline(interactive_runner.InteractiveRunner())
    pcoll = p | SomePTransform | AnotherPTransform
    result = p.run().wait_until_finish()
    pcoll_list = result.get(pcoll)  # This returns a list!
    
  3. Faster re-execution

    InteractiveRunner caches PCollection results of pipeline executed previously and re-uses it when the same pipeline is submitted again.

    p = beam.Pipeline(interactive_runner.InteractiveRunner())
    pcoll = p | SomePTransform | AnotherPTransform
    result = p.run().wait_until_finish()
    
    pcoll2 = pcoll | YetAnotherPTransform
    result = p.run().wait_until_finish()  # <- only executes YetAnotherPTransform
    

Status

  • Supported languages: Python

  • Supported platforms and caching location

    Caching locallyCaching on GCS
    Running on local machinesupportedsupported
    Running on Flink/supported

Getting Started

Note: This guide assumes that you are somewhat familiar with key concepts in Beam programming model including Pipeline, PCollection, PTransform and PipelineRunner (see The Beam Model for a quick reference). For a more general and complete getting started guide, see Apache Beam Python SDK Quickstart.

Pre-requisites

  • Install GraphViz with your favorite system package manager.
  • Install Jupyter. You can either use the one that's included in Anaconda or

    $ pip2 install --upgrade jupyter
    

    Make sure you have Python2 Jupyter since Apache Beam only supports Python 2 at the time being.

  • Install, create and activate your virtualenv. (optional but recommended)

    $ pip2 install --upgrade virtualenv
    $ virtualenv -p python2 beam_venv_dir
    $ source beam_venv_dir/bin/activate
    

    If you are using shells other than bash (e.g. fish, csh), check beam_venv_dir/bin for other scripts that activates the virtual environment.

    CHECK that the virtual environment is activated by running

    $ echo $VIRTUAL_ENV  # This should point to beam_venv_dir
    # or
    $ which python  # This sould point to beam_venv_dir/bin/python
    
  • Set up Apache Beam Python. Make sure the virtual environment is activated when you run setup.py

  • $ git clone git@github.com:apache/beam.git
    $ cd beam/sdks/python
    $ python setup.py install
    
  • Install a IPython kernel for the virtual environment you've just created. Make sure the virtual environment is activate when you do this. You can skip this step if not using virtualenv.

    $ python -m pip install ipykernel
    $ python -m ipykernel install --user --name beam_venv_kernel --display-name "Python (beam_venv)"
    

    CHECK that IPython kernel beam_venv_kernel is available for Jupyter to use.

    $ jupyter kernelspec list
    

Start the notebook

To start the notebook, simply run

$ jupyter notebook

This automatically opens your default web browser pointing to http://localhost:8888.

You can create a new notebook file by clicking New > Notebook: Python (beam_venv).

Or after you've already opend a notebook, change the kernel by clicking Kernel > Change Kernel > Python (beam_venv).

Voila! You can now run Beam pipelines interactively in your Jupyter notebook!

See [Interactive Beam Example.ipynb](examples/Interactive Beam Example.ipynb) for more examples.

Portability

Portability across Storage Systems

By default, the caches are kept on the local file system of the machine in /tmp directory.

You can specify the caching directory as follows

cache_dir = 'some/path/to/dir'
runner = interactive_runner.InteractiveRunner(cache_dir=cache_dir)
p = beam.Pipeline(runner=runner)

Caching PCollection on Google Cloud Storage

You can choose to cache PCollections on Google Cloud Storage with a few credential settings.

  • Install Google Cloud SDK, and set your project, account and other configurations with the following command.

    $ gcloud init
    $ gcloud auth login
    
  • Install the following google cloud modules. Make sure the virtual environment is activated when you do this.

  • $ python -m pip install --upgrade apache-beam[gcp]
    $ python -m pip install --upgrade google-cloud-storage
    $ python -m pip install --upgrade google-cloud-dataflow
    
  • Make sure you have read and write access to that bucket when you specify to use that directory as caching directory.

  • cache_dir = 'gs://bucket-name/dir'
    runner = interactive_runner.InteractiveRunner(cache_dir=cache_dir)
    p = beam.Pipeline(runner=runner)
    

Portability across Execution Platforms

The platform where the pipeline is executed is decided by the underlying runner of InteractiveRunner. The default configuration runs on local machine with direct_runner.DirectRunner() as the underlying runner.

Running Interactive Beam on Flink

You can choose to run Interactive Beam on Flink with the following settings.

  • Install docker.

  • Build the SDK container and start the local FlinkService.

    $ ./gradlew -p sdks/python/container docker
    $ ./gradlew beam-runners-flink_2.11-job-server:runShadow  # Blocking
    
  • Run $ jupyter notebook in another terminal.

  • Use portable_runner.PortableRunner() as the underlying runner, while providing a pipeline_options.PortableOptions() to the pipeline as follows.

    options = pipeline_options.PipelineOptions()
    options.view_as(pipeline_options.PortableOptions).job_endpoint = 'localhost:8099'
    options.view_as(pipeline_options.SetupOptions).sdk_location = 'container'
    options.view_as(pipeline_options.DebugOptions).experiments = 'beam_fn_api'
    
    cache_dir = 'gs://bucket-name/dir'
    underlying_runner = portable_runner.PortableRunner()
    runner = interactive_runner.InteractiveRunner(underlying_runner=underlying_runner, cache_dir=cache_dir)
    p = beam.Pipeline(runner=runner, options=options)
    

Note: Python Flink Runner (combination of PortableRunner and FlinkService) is being actively developed now, so these setups and commands are subject to changes. This guide and [Interactive Beam Running on Flink.ipynb](exampls/Interactive Beam Running on Flink.ipynb) capture the status of the world when it's last updated.

TL;DR;

You can now interactively run Beam Python pipeline! Check out the Youtube demo

IMAGE ALT TEXT HERE

More Information