Apache Beam is a unified model for defining both batch and streaming data-parallel processing pipelines. Beam provides a set of language-specific SDKs for constructing pipelines. These pipelines can be executed on distributed processing backends like Apache Spark, Apache Flink, and Google Cloud Dataflow.
Apache Beam for Python provides access to Beam capabilities from the Python programming language.
The key concepts of the programming model are:
For a further, detailed introduction, please read the Beam Programming Model.
See Apache Beam Python SDK Quickstart.
With your virtual environment active, you can follow along this tour by running a pydoc
server on a local port of your choosing (this example uses port 8888):
pydoc -p 8888
Open your browser and go to http://localhost:8888/apache_beam.html
Some interesting classes to navigate to:
PCollection
, in file apache_beam/pvalue.py
PTransform
, in file apache_beam/transforms/ptransform.py
FlatMap
, GroupByKey
, and Map
, in file apache_beam/transforms/core.py
apache_beam/transforms/combiners.py
Make sure you installed the package first. If not, run python setup.py install
, then run pydoc with pydoc -p 8888
.
The following examples demonstrate some basic, fundamental concepts for using Apache Beam's Python SDK. For more detailed examples, Beam provides a directory of examples for Python.
A basic pipeline will take as input an iterable, apply the beam.Create PTransform
, and produce a PCollection
that can be written to a file or modified by further PTransform
s. The >>
operator is used to label PTransform
s and the |
operator is used to chain them.
# Standard imports import apache_beam as beam # Create a pipeline executing on a direct runner (local, non-cloud). p = beam.Pipeline('DirectRunner') # Create a PCollection with names and write it to a file. (p | 'add names' >> beam.Create(['Ann', 'Joe']) | 'save' >> beam.io.WriteToText('./names')) # Execute the pipeline. p.run()
The Map
PTransform
returns one output per input. It takes a callable that is applied to each element of the input PCollection
and returns an element to the output PCollection
.
import apache_beam as beam p = beam.Pipeline('DirectRunner') # Read a file containing names, add a greeting to each name, and write to a file. (p | 'load names' >> beam.io.ReadFromText('./names') | 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello') | 'save' >> beam.io.WriteToText('./greetings')) p.run()
A FlatMap
is like a Map
except its callable returns a (possibly empty) iterable of elements for the output PCollection
.
The FlatMap
transform returns zero to many output per input. It accepts a callable that is applied to each element of the input PCollection
and returns an iterable with zero or more elements to the output PCollection
.
import apache_beam as beam p = beam.Pipeline('DirectRunner') # Read a file containing names, add two greetings to each name, and write to a file. (p | 'load names' >> beam.io.ReadFromText('./names') | 'add greetings' >> beam.FlatMap( lambda name, messages: ['%s %s!' % (msg, name) for msg in messages], ['Hello', 'Hola']) | 'save' >> beam.io.WriteToText('./greetings')) p.run()
The callable of a FlatMap
can be a generator, that is, a function using yield
.
import apache_beam as beam p = beam.Pipeline('DirectRunner') # Read a file containing names, add two greetings to each name # (with FlatMap using a yield generator), and write to a file. def add_greetings(name, messages): for msg in messages: yield '%s %s!' % (msg, name) (p | 'load names' >> beam.io.ReadFromText('./names') | 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola']) | 'save' >> beam.io.WriteToText('./greetings')) p.run()
This example shows how to read a text file from Google Cloud Storage and count its words.
import re import apache_beam as beam p = beam.Pipeline('DirectRunner') (p | 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt') | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x)) | 'count words' >> beam.combiners.Count.PerElement() | 'save' >> beam.io.WriteToText('./word_count')) p.run()
This is a somewhat forced example of GroupByKey
to count words as the previous example did, but without using beam.combiners.Count.PerElement
. As shown in the example, you can use a wildcard to specify the text file source.
import re import apache_beam as beam p = beam.Pipeline('DirectRunner') class MyCountTransform(beam.PTransform): def expand(self, pcoll): return (pcoll | 'one word' >> beam.Map(lambda word: (word, 1)) # GroupByKey accepts a PCollection of (word, 1) elements and # outputs a PCollection of (word, [1, 1, ...]) | 'group words' >> beam.GroupByKey() | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))) (p | 'read' >> beam.io.ReadFromText('./names*') | 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x)) | MyCountTransform() | 'write' >> beam.io.WriteToText('./word_count')) p.run()
In some cases, providing type hints can improve the efficiency of the data encoding.
import apache_beam as beam from apache_beam.typehints import typehints p = beam.Pipeline('DirectRunner') (p | 'read' >> beam.io.ReadFromText('./names') | 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int]) | 'group words' >> beam.GroupByKey() | 'save' >> beam.io.WriteToText('./typed_names')) p.run()
This example reads weather data from a BigQuery table, calculates the number of tornadoes per month, and writes the results to a table you specify.
import apache_beam as beam project = 'DESTINATION-PROJECT-ID' input_table = 'clouddataflow-readonly:samples.weather_stations' output_table = 'DESTINATION-DATASET.DESTINATION-TABLE' p = beam.Pipeline(argv=['--project', project]) (p | 'read' >> beam.Read(beam.io.BigQuerySource(input_table)) | 'months with tornadoes' >> beam.FlatMap( lambda row: [(int(row['month']), 1)] if row['tornado'] else []) | 'monthly count' >> beam.CombinePerKey(sum) | 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v}) | 'save' >> beam.Write( beam.io.BigQuerySink( output_table, schema='month:INTEGER, tornado_count:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) p.run()
This pipeline, like the one above, calculates the number of tornadoes per month, but it uses a query to filter out the input instead of using the whole table.
import apache_beam as beam project = 'DESTINATION-PROJECT-ID' output_table = 'DESTINATION-DATASET.DESTINATION-TABLE' input_query = 'SELECT month, COUNT(month) AS tornado_count ' \ 'FROM [clouddataflow-readonly:samples.weather_stations] ' \ 'WHERE tornado=true GROUP BY month' p = beam.Pipeline(argv=['--project', project]) (p | 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query)) | 'save' >> beam.Write(beam.io.BigQuerySink( output_table, schema='month:INTEGER, tornado_count:INTEGER', create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))) p.run()
Combiner transforms use “reducing” functions, such as sum, min, or max, to combine multiple values of a PCollection
into a single value.
import apache_beam as beam p = beam.Pipeline('DirectRunner') SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)] (p | beam.Create(SAMPLE_DATA) | beam.CombinePerKey(sum) | beam.io.WriteToText('./sums')) p.run()
The combiners_test.py file contains more combiner examples.
Many projects will grow to multiple source code files. It is recommended that you organize your project so that all code involved in running your pipeline can be built as a Python package. This way, the package can easily be installed in the VM workers executing the job.
Follow the Juliaset example. If the code is organized in this fashion, you can use the --setup_file
command line option to create a source distribution out of the project files, stage the resulting tarball, and later install it in the workers executing the job.
Please report any issues on JIRA.
If you’re interested in contributing to the Beam SDK, start by reading the Contribute guide.