tree: 124e36f05a6f85c8e2e6c559d12913bf77e73f4a [path history] [tgz]
  1. caching/
  2. dataproc/
  3. display/
  4. examples/
  5. extensions/
  6. messaging/
  7. options/
  8. sql/
  9. testing/
  10. __init__.py
  11. augmented_pipeline.py
  12. augmented_pipeline_test.py
  13. background_caching_job.py
  14. background_caching_job_test.py
  15. cache_manager.py
  16. cache_manager_test.py
  17. interactive_beam.py
  18. interactive_beam_test.py
  19. interactive_environment.py
  20. interactive_environment_test.py
  21. interactive_runner.py
  22. interactive_runner_test.py
  23. pipeline_fragment.py
  24. pipeline_fragment_test.py
  25. pipeline_instrument.py
  26. pipeline_instrument_test.py
  27. README.md
  28. recording_manager.py
  29. recording_manager_test.py
  30. user_pipeline_tracker.py
  31. user_pipeline_tracker_test.py
  32. utils.py
  33. utils_test.py
sdks/python/apache_beam/runners/interactive/README.md

Interactive Beam

Overview

Interactive Beam is aimed at integrating Apache Beam with Jupyter notebook to make pipeline prototyping and data exploration much faster and easier. It provides nice features including

  1. Graphical representation

    Visualize the Pipeline DAG:

    import apache_beam.runners.interactive.interactive_beam as ib
    from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
    
    p = beam.Pipeline(InteractiveRunner())
    # ... add transforms
    ib.show_graph(pipeline)
    

    Visualize elements in a PCollection:

    pcoll = p | beam.Create([1, 2, 3])
    # include_window_info displays windowing information
    # visualize_data visualizes data with https://pair-code.github.io/facets/
    ib.show(pcoll, include_window_info=True, visualize_data=True)
    

    More details see the docstrings of interactive_beam module.

  2. Support of streaming record/replay and dynamic visualization

    For streaming pipelines, Interactive Beam records a subset of unbounded sources in the pipeline automatically so that they can be replayed for pipeline changes during prototyping.

    There are a few knobs to tune the source recording:

    # Set the amount of time recording data from unbounded sources.
    ib.options.recording_duration = '10m'
    
    # Set the recording size limit to 1 GB.
    ib.options.recording_size_limit = 1e9
    
    # Visualization is dynamic as data streamed in real time.
    # n=100 indicates that displays at most 100 elements.
    # duration=60 indicates that displays at most 60 seconds worth of unbounded
    # source generated data.
    ib.show(pcoll, include_window_info=True, n=100, duration=60)
    
    # duration can also be strings.
    ib.show(pcoll, include_window_info=True, duration='1m')
    
    # If neither n nor duration is provided, the display is indefinitely until
    # the current machine's recording usage hits the threadshold set by
    # ib.options.
    ib.show(pcoll, include_window_info=True)
    

    More details see the docstrings of interactive_beam module.

  3. Fetching PCollections as pandas.DataFrame

    PCollections can be collected as a pandas.DataFrame:

    pcoll_df = ib.collect(pcoll)  # This returns a pandas.DataFrame!
    
  4. Faster execution and re-execution

    Interactive Beam analyzes the pipeline graph depending on what PCollection you want to inspect and builds a pipeline fragment to only compute necessary data.

    pcoll = p | PTransformA | PTransformB
    pcoll2 = p | PTransformC | PTransformD
    
    ib.collect(pcoll)  # <- only executes PTransformA and PTransformB
    ib.collect(pcoll2)  # <- only executes PTransformC and PTransformD
    

    Interactive Beam caches PCollection inspected previously and re-uses it when the data is still in scope.

    pcoll = p | PTransformA
    # pcoll2 depends on pcoll
    pcoll2 = pcoll | PTransformB
    ib.collect(pcoll2)  # <- caches data for both pcoll and pcoll2
    
    pcoll3 = pcoll2 | PTransformC
    ib.collect(pcoll3)  # <- reuses data of pcoll2 and only executes PTransformC
    
    pcoll4 = pcoll | PTransformD
    ib.collect(pcoll4)  # <- reuses data of pcoll and only executes PTransformD
    
  5. Supports global and local scopes

    Interactive Beam automatically watches the __main__ scope for pipeline and PCollection definitions to implicitly do magic under the hood.

    # In a script or in a notebook
    p = beam.Pipeline(InteractiveRunner())
    pcoll = beam | SomeTransform
    pcoll2 = pcoll | SomeOtherTransform
    
    # p, pcoll and pcoll2 are all known to Interactive Beam.
    ib.collect(pcoll)
    ib.collect(pcoll2)
    ib.show_graph(p)
    

    You have to explicitly watch pipelines and PCollections in your local scope. Otherwise, Interactive Beam doesn‘t know about them and won’t handle them with interactive features.

    def a_func():
      p = beam.Pipeline(InteractiveRunner())
      pcoll = beam | SomeTransform
      pcoll2 = pcoll | SomeOtherTransform
    
      # Watch everything defined locally before this line.
      ib.watch(locals())
      # Or explicitly watch them.
      ib.watch({
          'p': p,
          'pcoll': pcoll,
          'pcoll2': pcoll2})
    
      # p, pcoll and pcoll2 are all known to Interactive Beam.
      ib.collect(pcoll)
      ib.collect(pcoll2)
      ib.show_graph(p)
    
      return p, pcoll, pcoll2
    
    # Or return them to main scope
    p, pcoll, pcoll2 = a_func()
    ib.collect(pcoll)  # Also works!
    

Status

  • Supported languages: Python

  • Supported platforms and caching location

    Caching locallyCaching on GCS
    Running on local machinesupportedsupported
    Running on Flinksupportedsupported

Getting Started

Note: This guide assumes that you are somewhat familiar with key concepts in Beam programming model including Pipeline, PCollection, PTransform and PipelineRunner (see The Beam Model for a quick reference). For a more general and complete getting started guide, see Apache Beam Python SDK Quickstart.

Pre-requisites

  • Install GraphViz with your favorite system package manager.

  • Install, create and activate your venv. (optional but recommended)

    python3 -m venv /path/to/beam_venv_dir
    source /path/to/beam_venv_dir/bin/activate
    pip install --upgrade pip setuptools wheel
    

    If you are using shells other than bash (e.g. fish, csh), check beam_venv_dir/bin for other scripts that activates the virtual environment.

    CHECK that the virtual environment is activated by running

    which python  # This sould point to beam_venv_dir/bin/python
    
  • Install JupyterLab. You can use either conda or pip.

    • conda
      conda install -c conda-forge jupyterlab
      
    • pip
      pip install jupyterlab
      
  • Set up Apache Beam Python. Make sure the virtual environment is activated when you run setup.py

  • git clone https://github.com/apache/beam
    cd beam/sdks/python
    python setup.py install
    
  • Install an IPython kernel for the virtual environment you've just created. Make sure the virtual environment is activate when you do this. You can skip this step if not using venv.

    pip install ipykernel
    python -m ipykernel install --user --name beam_venv_kernel --display-name "Python3 (beam_venv)"
    

    CHECK that IPython kernel beam_venv_kernel is available for Jupyter to use.

    jupyter kernelspec list
    
  • Extend JupyterLab through labextension. Note: labextension is different from nbextension from pre-lab jupyter notebooks.

    All jupyter labextensions need nodejs

    # Homebrew users do
    brew install node
    # Or Conda users do
    conda install -c conda-forge nodejs
    

    Enable ipywidgets

    pip install ipywidgets
    jupyter labextension install @jupyter-widgets/jupyterlab-manager
    

Start the notebook

To start the notebook, simply run

jupyter lab

Optionally increase the iopub broadcast data rate limit of jupyterlab

jupyter lab --NotebookApp.iopub_data_rate_limit=10000000

This automatically opens your default web browser pointing to http://localhost:8888/lab.

You can create a new notebook file by clicking Python3 (beam_venv) from the launcher page of jupyterlab.

Or after you've already opened a notebook, change the kernel by clicking Kernel > Change Kernel > Python3 (beam_venv).

Voila! You can now run Beam pipelines interactively in your Jupyter notebook!

In the notebook, you can use tab key on the keyboard for auto-completion. To turn on greedy auto-completion, you can run such ipython magic

%config IPCompleter.greedy=True

You can also use shift + tab keys on the keyboard for a popup of docstrings at the current cursor position.

See Interactive Beam Example.ipynb for more examples.

Portability

Portability across Storage Systems

By default, the caches are kept on the local file system of the machine in /tmp directory.

You can specify the caching directory as follows

ib.options.cache_root = 'some/path/to/dir'

When using an InteractiveRunner(underlying_runner=...) that is running remotely and distributed, a distributed file system such as Cloud Storage (ib.options.cache_root = gs://bucket/obj) is necessary.

Caching PCollection on Google Cloud Storage

You can choose to cache PCollections on Google Cloud Storage with a few credential settings.

  • Install Google Cloud SDK, and set your project, account and other configurations with the following command.

    $ gcloud init
    $ gcloud auth login
    
  • Install the following google cloud modules. Make sure the virtual environment is activated when you do this.

  • $ python -m pip install --upgrade apache-beam[gcp]
    $ python -m pip install --upgrade google-cloud-storage
    $ python -m pip install --upgrade google-cloud-dataflow
    
  • Make sure you have read and write access to that bucket when you specify to use that directory as caching directory.

  • You may configure a cache directory to be used by all pipelines created afterward with an InteractiveRunner.

  • ib.options.cache_root = 'gs://bucket-name/obj'
    

Portability across Execution Platforms

The platform where the pipeline is executed is decided by the underlying runner of InteractiveRunner. The default configuration runs on local machine with direct_runner.DirectRunner() as the underlying runner.

Running Interactive Beam on Flink

You can choose to run Interactive Beam on Flink with the following settings.

  • Use flink_runner.FlinkRunner() as the underlying runner.

    p = beam.Pipeline(interactive_runner.InteractiveRunner(underlying_runner=flink_runner.FlinkRunner()))
    
  • Alternatively, if the runtime environment is configured with a Google Cloud project, you can run Interactive Beam with Flink on Cloud Dataproc. To do so, configure the pipeline with a Google Cloud project. If using dev versioned Beam built from source code, it is necessary to specify an environment_config option to configure a containerized Beam SDK (you can choose a released container or build one yourself).

  • ib.options.cache_root = 'gs://bucket-name/obj'
    options = PipelineOptions([
    # The project can be attained simply from running the following commands:
    # import google.auth
    # project = google.auth.default()[1]
    '--project={}'.format(project),
    # The following environment_config only needs to be used when using a development kernel.
    # Users do not need to use the 2.35.0 SDK, but the chosen release must be compatible with
    # the Flink version used by the Dataproc image used by Interactive Beam. The current Flink
    # version used is 1.12.5.
    '--environment_config=apache/beam_python3.7_sdk:2.35.0',
    ])
    

Note: This guide and Interactive Beam Running on Flink.ipynb capture the status of the world when it's last updated.

More Information