blob: 395cd5321967e460f6ac4950eae8296f16b032e8 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beam YAML API
While Beam provides powerful APIs for authoring sophisticated data
processing pipelines, it often still has too high a barrier for
getting started and authoring simple pipelines. Even setting up the
environment, installing the dependencies, and setting up the project
can be an overwhelming amount of boilerplate for some (though
https://beam.apache.org/blog/beam-starter-projects/ has gone a long
way in making this easier).
Here we provide a simple declarative syntax for describing pipelines
that does not require coding experience or learning how to use an
SDK&mdash;any text editor will do.
Some installation may be required to actually *execute* a pipeline, but
we envision various services (such as Dataflow) to accept yaml pipelines
directly obviating the need for even that in the future.
We also anticipate the ability to generate code directly from these
higher-level yaml descriptions, should one want to graduate to a full
Beam SDK (and possibly the other direction as well as far as possible).
Though we intend this syntax to be easily authored (and read) directly by
humans, this may also prove a useful intermediate representation for
tools to use as well, either as output (e.g. a pipeline authoring GUI)
or consumption (e.g. a lineage analysis tool) and expect it to be more
easily manipulated and semantically meaningful than the Beam protos
themselves (which concern themselves more with execution).
It should be noted that everything here is still under development, but any
features already included are considered stable. Feedback is welcome at
dev@apache.beam.org.
## Example pipelines
Here is a simple pipeline that reads some data from csv files and
writes it out in json format.
```
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: WriteToJson
config:
path: /path/to/output.json
input: ReadFromCsv
```
We can also add a transformation
```
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: WriteToJson
config:
path: /path/to/output.json
input: Filter
```
or two.
```
pipeline:
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
input: ReadFromCsv
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
input: Filter
- type: WriteToJson
config:
path: /path/to/output.json
input: MySqlTransform
```
If the pipeline is linear, we can let the inputs be implicit by designating
the pipeline as a `chain` type.
```
pipeline:
type: chain
transforms:
- type: ReadFromCsv
config:
path: /path/to/input*.csv
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
- type: WriteToJson
config:
path: /path/to/output.json
```
As syntactic sugar, we can name the first and last transforms in our pipeline
as `source` and `sink`.
```
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: Filter
config:
language: python
keep: "col3 > 100"
- type: Sql
name: MySqlTransform
config:
query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
sink:
type: WriteToJson
config:
path: /path/to/output.json
```
Arbitrary non-linear pipelines are supported as well, though in this case
inputs must be explicitly named.
Here we read two sources, join them, and write two outputs.
```
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: Filter
name: FilterToBig
input: Sql
config:
language: python
keep: "col2 > 100"
- type: WriteToCsv
name: WriteBig
input: FilterToBig
config:
path: /path/to/big.csv
```
One can, however, nest `chains` within a non-linear pipeline.
For example, here `ExtraProcessingForBigRows` is itself a "chain" transform
that has a single input and contains its own sink.
```
pipeline:
transforms:
- type: ReadFromCsv
name: ReadLeft
config:
path: /path/to/left*.csv
- type: ReadFromCsv
name: ReadRight
config:
path: /path/to/right*.csv
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
- type: WriteToJson
name: WriteAll
input: Sql
config:
path: /path/to/all.json
- type: chain
name: ExtraProcessingForBigRows
input: Sql
transforms:
- type: Filter
config:
language: python
keep: "col2 > 100"
- type: Filter
config:
language: python
keep: "len(col1) > 10"
- type: Filter
config:
language: python
keep: "col1 > 'z'"
sink:
type: WriteToCsv
config:
path: /path/to/big.csv
```
## Windowing
This API can be used to define both streaming and batch pipelines.
In order to meaningfully aggregate elements in a streaming pipeline,
some kind of windowing is typically required. Beam's
[windowing](https://beam.apache.org/documentation/programming-guide/#windowing)
and [triggering](https://beam.apache.org/documentation/programming-guide/#triggers)
can be be declared using the same WindowInto transform available in all other
SDKs.
```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: json
schema:
type: object
properties:
col1: {type: string}
col2: {type: integer}
col3: {type: number}
- type: WindowInto
windowing:
type: fixed
size: 60
- type: SomeAggregation
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```
Rather than using an explicit `WindowInto` operation, one may instead tag a
transform itself with a specified windowing which will cause its inputs
(and hence the transform itself) to be applied with that windowing.
```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: SomeAggregation
windowing:
type: sliding
size: 60
period: 10
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```
Note that the `Sql` operation itself is often a from of aggregation, and
applying a windowing (or consuming an already windowed input) will cause all
grouping to be done per window.
```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
windowing:
type: sessions
gap: 60
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```
The specified windowing is applied to all inputs, in this case resulting in
a join per window.
```
pipeline:
transforms:
- type: ReadFromPubSub
name: ReadLeft
config:
topic: leftTopic
format: ...
schema: ...
- type: ReadFromPubSub
name: ReadRight
config:
topic: rightTopic
format: ...
schema: ...
- type: Sql
config:
query: select left.col1, right.col2 from left join right using (col3)
input:
left: ReadLeft
right: ReadRight
windowing:
type: fixed
size: 60
```
For a transform with no inputs, the specified windowing is instead applied to
its output(s). As per the Beam model, the windowing is then inherited by all
consuming operations. This is especially useful for root operations like Read.
```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 60
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
```
One can also specify windowing at the top level of a pipeline (or composite),
which is a shorthand to simply applying this same windowing to all root
operations (that don't otherwise specify their own windowing),
and can be an effective way to apply it everywhere.
```
pipeline:
type: chain
transforms:
- type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
- type: WriteToPubSub
config:
topic: anotherPubSubTopic
format: json
windowing:
type: fixed
size: 60
```
Note that all these windowing specifications are compatible with the `source`
and `sink` syntax as well
```
pipeline:
type: chain
source:
type: ReadFromPubSub
config:
topic: myPubSubTopic
format: ...
schema: ...
windowing:
type: fixed
size: 10
transforms:
- type: Sql
config:
query: "select col1, count(*) as c from PCOLLECTION"
sink:
type: WriteToCsv
config:
path: /path/to/output.json
windowing:
type: fixed
size: 300
```
## Providers
Though we aim to offer a large suite of built-in transforms, it is inevitable
that people will want to be able to author their own. This is made possible
through the notion of Providers which leverage expansion services and
schema transforms.
For example, one could build a jar that vends a
[cross language transform](https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/)
or [schema transform](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/schemas/transforms/SchemaTransformProvider.html)
and then use it in a transform as follows
```
pipeline:
type: chain
source:
type: ReadFromCsv
config:
path: /path/to/input*.csv
transforms:
- type: MyCustomTransform
config:
arg: whatever
sink:
type: WriteToJson
config:
path: /path/to/output.json
providers:
- type: javaJar
jar: /path/or/url/to/myExpansionService.jar
transforms:
MyCustomTransform: "urn:registered:in:expansion:service"
```
Arbitrary Python transforms can be provided as well, using the syntax
```
providers:
- type: pythonPackage
packages:
- my_pypi_package>=version
- /path/to/local/package.zip
transforms:
MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
```
## Running pipelines
The Beam yaml parser is currently included as part of the Apache Beam Python SDK.
This can be installed (e.g. within a virtual environment) as
```
pip install apache_beam[yaml,gcp]
```
In addition, several of the provided transforms (such as SQL) are implemented
in Java and their expansion will require a working Java interpeter. (The
requisite artifacts will be automatically downloaded from the apache maven
repositories, so no further installs will be required.)
Docker is also currently required for local execution of these
cross-language-requiring transforms, but not for submission to a non-local
runner such as Flink or Dataflow.
Once the prerequisites are installed, you can execute a pipeline defined
in a yaml file as
```
python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/pipeline.yaml [other pipeline options such as the runner]
```