tree: f7e4445c8acaca85044198362fb0b05d1616a7e0 [path history] [tgz]
  1. apache_beam/
  2. .pylintrc
  3. generate_pydoc.sh
  4. MANIFEST.in
  5. pom.xml
  6. README.md
  7. run_postcommit.sh
  8. run_pylint.sh
  9. setup.cfg
  10. setup.py
  11. test_config.py
  12. tox.ini
sdks/python/README.md

Apache Beam - Python SDK

Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. Beam provides a set of language-specific SDKs for constructing pipelines. These pipelines can be executed on distributed processing backends like Apache Spark, Apache Flink, and Google Cloud Dataflow.

Apache Beam for Python provides access to Beam capabilities from the Python programming language.

Table of Contents

Overview of the Programming Model

The key concepts of the programming model are:

  • PCollection - represents a collection of data, which could be bounded or unbounded in size.
  • PTransform - represents a computation that transforms input PCollections into output PCollections.
  • Pipeline - manages a directed acyclic graph of PTransforms and PCollections that is ready for execution.
  • Runner - specifies where and how the Pipeline should execute.

For a further, detailed introduction, please read the Beam Programming Model.

Getting Started

See Apache Beam Python SDK Quickstart.

A Quick Tour of the Source Code

With your virtual environment active, you can follow along this tour by running a pydoc server on a local port of your choosing (this example uses port 8888):

pydoc -p 8888

Open your browser and go to http://localhost:8888/apache_beam.html

Some interesting classes to navigate to:

Make sure you installed the package first. If not, run python setup.py install, then run pydoc with pydoc -p 8888.

Simple Examples

The following examples demonstrate some basic, fundamental concepts for using Apache Beam's Python SDK. For more detailed examples, Beam provides a directory of examples for Python.

Basic pipeline

A basic pipeline will take as input an iterable, apply the beam.Create PTransform, and produce a PCollection that can be written to a file or modified by further PTransforms. The >> operator is used to label PTransforms and the | operator is used to chain them.

# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectRunner')
# Create a PCollection with names and write it to a file.
(p
 | 'add names' >> beam.Create(['Ann', 'Joe'])
 | 'save' >> beam.io.WriteToText('./names'))
# Execute the pipeline.
p.run()

Basic pipeline (with Map)

The Map PTransform returns one output per input. It takes a callable that is applied to each element of the input PCollection and returns an element to the output PCollection.

import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add a greeting to each name, and write to a file.
(p
 | 'load names' >> beam.io.ReadFromText('./names')
 | 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello')
 | 'save' >> beam.io.WriteToText('./greetings'))
p.run()

Basic pipeline (with FlatMap)

A FlatMap is like a Map except its callable returns a (possibly empty) iterable of elements for the output PCollection.

The FlatMap transform returns zero to many output per input. It accepts a callable that is applied to each element of the input PCollection and returns an iterable with zero or more elements to the output PCollection.

import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add two greetings to each name, and write to a file.
(p
 | 'load names' >> beam.io.ReadFromText('./names')
 | 'add greetings' >> beam.FlatMap(
    lambda name, messages: ['%s %s!' % (msg, name) for msg in messages],
    ['Hello', 'Hola'])
 | 'save' >> beam.io.WriteToText('./greetings'))
p.run()

Basic pipeline (with FlatMap and yield)

The callable of a FlatMap can be a generator, that is, a function using yield.

import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add two greetings to each name
# (with FlatMap using a yield generator), and write to a file.
def add_greetings(name, messages):
  for msg in messages:
    yield '%s %s!' % (msg, name)

(p
 | 'load names' >> beam.io.ReadFromText('./names')
 | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola'])
 | 'save' >> beam.io.WriteToText('./greetings'))
p.run()

Counting words

This example shows how to read a text file from Google Cloud Storage and count its words.

import re
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
(p
 | 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
 | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
 | 'count words' >> beam.combiners.Count.PerElement()
 | 'save' >> beam.io.WriteToText('./word_count'))
p.run()

Counting words with GroupByKey

This is a somewhat forced example of GroupByKey to count words as the previous example did, but without using beam.combiners.Count.PerElement. As shown in the example, you can use a wildcard to specify the text file source.

import re
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
class MyCountTransform(beam.PTransform):
  def expand(self, pcoll):
    return (pcoll
            | 'one word' >> beam.Map(lambda word: (word, 1))
            # GroupByKey accepts a PCollection of (word, 1) elements and
            # outputs a PCollection of (word, [1, 1, ...])
            | 'group words' >> beam.GroupByKey()
            | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))))

(p
 | 'read' >> beam.io.ReadFromText('./names*')
 | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
 | MyCountTransform()
 | 'write' >> beam.io.WriteToText('./word_count'))
p.run()

Type hints

In some cases, providing type hints can improve the efficiency of the data encoding.

import apache_beam as beam
from apache_beam.typehints import typehints
p = beam.Pipeline('DirectRunner')
(p
 | 'read' >> beam.io.ReadFromText('./names')
 | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
 | 'group words' >> beam.GroupByKey()
 | 'save' >> beam.io.WriteToText('./typed_names'))
p.run()

BigQuery

This example reads weather data from a BigQuery table, calculates the number of tornadoes per month, and writes the results to a table you specify.

import apache_beam as beam
project = 'DESTINATION-PROJECT-ID'
input_table = 'clouddataflow-readonly:samples.weather_stations'
output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'

p = beam.Pipeline(argv=['--project', project])
(p
 | 'read' >> beam.Read(beam.io.BigQuerySource(input_table))
 | 'months with tornadoes' >> beam.FlatMap(
    lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
 | 'monthly count' >> beam.CombinePerKey(sum)
 | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})
 | 'save' >> beam.Write(
    beam.io.BigQuerySink(
        output_table,
        schema='month:INTEGER, tornado_count:INTEGER',
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

This pipeline, like the one above, calculates the number of tornadoes per month, but it uses a query to filter out the input instead of using the whole table.

import apache_beam as beam
project = 'DESTINATION-PROJECT-ID'
output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
        'FROM [clouddataflow-readonly:samples.weather_stations] ' \
        'WHERE tornado=true GROUP BY month'
p = beam.Pipeline(argv=['--project', project])
(p
 | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query))
 | 'save' >> beam.Write(beam.io.BigQuerySink(
    output_table,
    schema='month:INTEGER, tornado_count:INTEGER',
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()

Combiner Examples

Combiner transforms use “reducing” functions, such as sum, min, or max, to combine multiple values of a PCollection into a single value.

import apache_beam as beam
p = beam.Pipeline('DirectRunner')

SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]

(p
 | beam.Create(SAMPLE_DATA)
 | beam.CombinePerKey(sum)
 | beam.io.WriteToText('./sums'))
p.run()

The combiners_test.py file contains more combiner examples.

Organizing Your Code

Many projects will grow to multiple source code files. It is recommended that you organize your project so that all code involved in running your pipeline can be built as a Python package. This way, the package can easily be installed in the VM workers executing the job.

Follow the Juliaset example. If the code is organized in this fashion, you can use the --setup_file command line option to create a source distribution out of the project files, stage the resulting tarball, and later install it in the workers executing the job.

More Information

Please report any issues on JIRA.

If you’re interested in contributing to the Beam SDK, start by reading the Contribute guide.