blob: 7836a04ead8997fe41a20dcbecfdba3d74ed9988 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Apache Beam - Python SDK
[Apache Beam](http://beam.apache.org) is a unified model for defining both batch and streaming data-parallel processing pipelines. Beam provides a set of language-specific SDKs for constructing pipelines. These pipelines can be executed on distributed processing backends like [Apache Spark](http://spark.apache.org/), [Apache Flink](http://flink.apache.org), and [Google Cloud Dataflow](http://cloud.google.com/dataflow).
Apache Beam for Python provides access to Beam capabilities from the Python programming language.
## Table of Contents
* [Overview of the Beam Programming Model](#overview-of-the-programming-model)
* [Getting Started](#getting-started)
* [A Quick Tour of the Source Code](#a-quick-tour-of-the-source-code)
* [Simple Examples](#simple-examples)
* [Basic pipeline](#basic-pipeline)
* [Basic pipeline (with Map)](#basic-pipeline-with-map)
* [Basic pipeline (with FlatMap)](#basic-pipeline-with-flatmap)
* [Basic pipeline (with FlatMap and yield)](#basic-pipeline-with-flatmap-and-yield)
* [Counting words](#counting-words)
* [Counting words with GroupByKey](#counting-words-with-groupbykey)
* [Type hints](#type-hints)
* [BigQuery](#bigquery)
* [Combiner examples](#combiner-examples)
* [Organizing Your Code](#organizing-your-code)
* [Contact Us](#contact-us)
## Overview of the Programming Model
The key concepts of the programming model are:
* PCollection - represents a collection of data, which could be bounded or unbounded in size.
* PTransform - represents a computation that transforms input PCollections into output
PCollections.
* Pipeline - manages a directed acyclic graph of PTransforms and PCollections that is ready
for execution.
* Runner - specifies where and how the Pipeline should execute.
For a further, detailed introduction, please read the
[Beam Programming Model](http://beam.apache.org/documentation/programming-guide).
## Getting Started
See [Apache Beam Python SDK Quickstart](https://beam.apache.org/get-started/quickstart-py/).
## A Quick Tour of the Source Code
With your virtual environment active, you can follow along this tour by running a `pydoc` server on a local port of your choosing (this example uses port 8888):
```
pydoc -p 8888
```
Open your browser and go to
http://localhost:8888/apache_beam.html
Some interesting classes to navigate to:
* `PCollection`, in file
[`apache_beam/pvalue.py`](http://localhost:8888/apache_beam.pvalue.html)
* `PTransform`, in file
[`apache_beam/transforms/ptransform.py`](http://localhost:8888/apache_beamtransforms.ptransform.html)
* `FlatMap`, `GroupByKey`, and `Map`, in file
[`apache_beam/transforms/core.py`](http://localhost:8888/apache_beam.transforms.core.html)
* combiners, in file
[`apache_beam/transforms/combiners.py`](http://localhost:8888/apache_beam.transforms.combiners.html)
Make sure you installed the package first. If not, run `python setup.py install`, then run pydoc with `pydoc -p 8888`.
## Simple Examples
The following examples demonstrate some basic, fundamental concepts for using Apache Beam's Python SDK. For more detailed examples, Beam provides a [directory of examples](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples) for Python.
### Basic pipeline
A basic pipeline will take as input an iterable, apply the
beam.Create `PTransform`, and produce a `PCollection` that can
be written to a file or modified by further `PTransform`s.
The `>>` operator is used to label `PTransform`s and
the `|` operator is used to chain them.
```python
# Standard imports
import apache_beam as beam
# Create a pipeline executing on a direct runner (local, non-cloud).
p = beam.Pipeline('DirectRunner')
# Create a PCollection with names and write it to a file.
(p
| 'add names' >> beam.Create(['Ann', 'Joe'])
| 'save' >> beam.io.WriteToText('./names'))
# Execute the pipeline.
p.run()
```
### Basic pipeline (with Map)
The `Map` `PTransform` returns one output per input. It takes a callable that is applied to each element of the input `PCollection` and returns an element to the output `PCollection`.
```python
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add a greeting to each name, and write to a file.
(p
| 'load names' >> beam.io.ReadFromText('./names')
| 'add greeting' >> beam.Map(lambda name, msg: '%s, %s!' % (msg, name), 'Hello')
| 'save' >> beam.io.WriteToText('./greetings'))
p.run()
```
### Basic pipeline (with FlatMap)
A `FlatMap` is like a `Map` except its callable returns a (possibly
empty) iterable of elements for the output `PCollection`.
The `FlatMap` transform returns zero to many output per input. It accepts a callable that is applied to each element of the input `PCollection` and returns an iterable with zero or more elements to the output `PCollection`.
```python
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add two greetings to each name, and write to a file.
(p
| 'load names' >> beam.io.ReadFromText('./names')
| 'add greetings' >> beam.FlatMap(
lambda name, messages: ['%s %s!' % (msg, name) for msg in messages],
['Hello', 'Hola'])
| 'save' >> beam.io.WriteToText('./greetings'))
p.run()
```
### Basic pipeline (with FlatMap and yield)
The callable of a `FlatMap` can be a generator, that is,
a function using `yield`.
```python
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
# Read a file containing names, add two greetings to each name
# (with FlatMap using a yield generator), and write to a file.
def add_greetings(name, messages):
for msg in messages:
yield '%s %s!' % (msg, name)
(p
| 'load names' >> beam.io.ReadFromText('./names')
| 'add greetings' >> beam.FlatMap(add_greetings, ['Hello', 'Hola'])
| 'save' >> beam.io.WriteToText('./greetings'))
p.run()
```
### Counting words
This example shows how to read a text file from [Google Cloud Storage](https://cloud.google.com/storage/) and count its words.
```python
import re
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
(p
| 'read' >> beam.io.ReadFromText('gs://dataflow-samples/shakespeare/kinglear.txt')
| 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
| 'count words' >> beam.combiners.Count.PerElement()
| 'save' >> beam.io.WriteToText('./word_count'))
p.run()
```
### Counting words with GroupByKey
This is a somewhat forced example of `GroupByKey` to count words as the previous example did, but without using `beam.combiners.Count.PerElement`. As shown in the example, you can use a wildcard to specify the text file source.
```python
import re
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
class MyCountTransform(beam.PTransform):
def expand(self, pcoll):
return (pcoll
| 'one word' >> beam.Map(lambda word: (word, 1))
# GroupByKey accepts a PCollection of (word, 1) elements and
# outputs a PCollection of (word, [1, 1, ...])
| 'group words' >> beam.GroupByKey()
| 'count words' >> beam.Map(lambda (word, counts): (word, len(counts))))
(p
| 'read' >> beam.io.ReadFromText('./names*')
| 'split' >> beam.FlatMap(lambda x: re.findall(r'\w+', x))
| MyCountTransform()
| 'write' >> beam.io.WriteToText('./word_count'))
p.run()
```
### Type hints
In some cases, providing type hints can improve the efficiency
of the data encoding.
```python
import apache_beam as beam
from apache_beam.typehints import typehints
p = beam.Pipeline('DirectRunner')
(p
| 'read' >> beam.io.ReadFromText('./names')
| 'add types' >> beam.Map(lambda x: (x, 1)).with_output_types(typehints.KV[str, int])
| 'group words' >> beam.GroupByKey()
| 'save' >> beam.io.WriteToText('./typed_names'))
p.run()
```
### BigQuery
This example reads weather data from a BigQuery table, calculates the number of tornadoes per month, and writes the results to a table you specify.
```python
import apache_beam as beam
project = 'DESTINATION-PROJECT-ID'
input_table = 'clouddataflow-readonly:samples.weather_stations'
output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
p = beam.Pipeline(argv=['--project', project])
(p
| 'read' >> beam.Read(beam.io.BigQuerySource(input_table))
| 'months with tornadoes' >> beam.FlatMap(
lambda row: [(int(row['month']), 1)] if row['tornado'] else [])
| 'monthly count' >> beam.CombinePerKey(sum)
| 'format' >> beam.Map(lambda (k, v): {'month': k, 'tornado_count': v})
| 'save' >> beam.Write(
beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
```
This pipeline, like the one above, calculates the number of tornadoes per month, but it uses a query to filter out the input instead of using the whole table.
```python
import apache_beam as beam
project = 'DESTINATION-PROJECT-ID'
output_table = 'DESTINATION-DATASET.DESTINATION-TABLE'
input_query = 'SELECT month, COUNT(month) AS tornado_count ' \
'FROM [clouddataflow-readonly:samples.weather_stations] ' \
'WHERE tornado=true GROUP BY month'
p = beam.Pipeline(argv=['--project', project])
(p
| 'read' >> beam.Read(beam.io.BigQuerySource(query=input_query))
| 'save' >> beam.Write(beam.io.BigQuerySink(
output_table,
schema='month:INTEGER, tornado_count:INTEGER',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)))
p.run()
```
### Combiner Examples
Combiner transforms use "reducing" functions, such as sum, min, or max, to combine multiple values of a `PCollection` into a single value.
```python
import apache_beam as beam
p = beam.Pipeline('DirectRunner')
SAMPLE_DATA = [('a', 1), ('b', 10), ('a', 2), ('a', 3), ('b', 20)]
(p
| beam.Create(SAMPLE_DATA)
| beam.CombinePerKey(sum)
| beam.io.WriteToText('./sums'))
p.run()
```
The [combiners_test.py](https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/combiners_test.py) file contains more combiner examples.
## Organizing Your Code
Many projects will grow to multiple source code files. It is recommended that you organize your project so that all code involved in running your pipeline can be built as a Python package. This way, the package can easily be installed in the VM workers executing the job.
Follow the [Juliaset example](https://github.com/apache/beam/tree/master/sdks/python/apache_beam/examples/complete/juliaset). If the code is organized in this fashion, you can use the `--setup_file` command line option to create a source distribution out of the project files, stage the resulting tarball, and later install it in the workers executing the job.
## More Information
Please report any issues on [JIRA](https://issues.apache.org/jira/browse/BEAM/component/12328910).
If you’re interested in contributing to the Beam SDK, start by reading the [Contribute](http://beam.apache.org/contribute/) guide.