While Beam provides powerful APIs for authoring sophisticated data processing pipelines, it often still has too high a barrier for getting started and authoring simple pipelines. Even setting up the environment, installing the dependencies, and setting up the project can be an overwhelming amount of boilerplate for some (though https://beam.apache.org/blog/beam-starter-projects/ has gone a long way in making this easier).
Here we provide a simple declarative syntax for describing pipelines that does not require coding experience or learning how to use an SDK—any text editor will do. Some installation may be required to actually execute a pipeline, but we envision various services (such as Dataflow) to accept yaml pipelines directly obviating the need for even that in the future. We also anticipate the ability to generate code directly from these higher-level yaml descriptions, should one want to graduate to a full Beam SDK (and possibly the other direction as well as far as possible).
Though we intend this syntax to be easily authored (and read) directly by humans, this may also prove a useful intermediate representation for tools to use as well, either as output (e.g. a pipeline authoring GUI) or consumption (e.g. a lineage analysis tool) and expect it to be more easily manipulated and semantically meaningful than the Beam protos themselves (which concern themselves more with execution).
It should be noted that everything here is still under development, but any features already included are considered stable. Feedback is welcome at dev@apache.beam.org.
Here is a simple pipeline that reads some data from csv files and writes it out in json format.
pipeline: transforms: - type: ReadFromCsv config: path: /path/to/input*.csv - type: WriteToJson config: path: /path/to/output.json input: ReadFromCsv
We can also add a transformation
pipeline: transforms: - type: ReadFromCsv config: path: /path/to/input*.csv - type: Filter config: language: python keep: "col3 > 100" input: ReadFromCsv - type: WriteToJson config: path: /path/to/output.json input: Filter
or two.
pipeline: transforms: - type: ReadFromCsv config: path: /path/to/input*.csv - type: Filter config: language: python keep: "col3 > 100" input: ReadFromCsv - type: Sql name: MySqlTransform config: query: "select col1, count(*) as cnt from PCOLLECTION group by col1" input: Filter - type: WriteToJson config: path: /path/to/output.json input: MySqlTransform
If the pipeline is linear, we can let the inputs be implicit by designating the pipeline as a chain
type.
pipeline: type: chain transforms: - type: ReadFromCsv config: path: /path/to/input*.csv - type: Filter config: language: python keep: "col3 > 100" - type: Sql name: MySqlTransform config: query: "select col1, count(*) as cnt from PCOLLECTION group by col1" - type: WriteToJson config: path: /path/to/output.json
As syntactic sugar, we can name the first and last transforms in our pipeline as source
and sink
.
pipeline: type: chain source: type: ReadFromCsv config: path: /path/to/input*.csv transforms: - type: Filter config: language: python keep: "col3 > 100" - type: Sql name: MySqlTransform config: query: "select col1, count(*) as cnt from PCOLLECTION group by col1" sink: type: WriteToJson config: path: /path/to/output.json
Arbitrary non-linear pipelines are supported as well, though in this case inputs must be explicitly named. Here we read two sources, join them, and write two outputs.
pipeline: transforms: - type: ReadFromCsv name: ReadLeft config: path: /path/to/left*.csv - type: ReadFromCsv name: ReadRight config: path: /path/to/right*.csv - type: Sql config: query: select left.col1, right.col2 from left join right using (col3) input: left: ReadLeft right: ReadRight - type: WriteToJson name: WriteAll input: Sql config: path: /path/to/all.json - type: Filter name: FilterToBig input: Sql config: language: python keep: "col2 > 100" - type: WriteToCsv name: WriteBig input: FilterToBig config: path: /path/to/big.csv
One can, however, nest chains
within a non-linear pipeline. For example, here ExtraProcessingForBigRows
is itself a “chain” transform that has a single input and contains its own sink.
pipeline: transforms: - type: ReadFromCsv name: ReadLeft config: path: /path/to/left*.csv - type: ReadFromCsv name: ReadRight config: path: /path/to/right*.csv - type: Sql config: query: select left.col1, right.col2 from left join right using (col3) input: left: ReadLeft right: ReadRight - type: WriteToJson name: WriteAll input: Sql config: path: /path/to/all.json - type: chain name: ExtraProcessingForBigRows input: Sql transforms: - type: Filter config: language: python keep: "col2 > 100" - type: Filter config: language: python keep: "len(col1) > 10" - type: Filter config: language: python keep: "col1 > 'z'" sink: type: WriteToCsv config: path: /path/to/big.csv
This API can be used to define both streaming and batch pipelines. In order to meaningfully aggregate elements in a streaming pipeline, some kind of windowing is typically required. Beam's windowing and triggering can be be declared using the same WindowInto transform available in all other SDKs.
pipeline: type: chain transforms: - type: ReadFromPubSub config: topic: myPubSubTopic format: json schema: type: object properties: col1: {type: string} col2: {type: integer} col3: {type: number} - type: WindowInto windowing: type: fixed size: 60 - type: SomeAggregation - type: WriteToPubSub config: topic: anotherPubSubTopic format: json
Rather than using an explicit WindowInto
operation, one may instead tag a transform itself with a specified windowing which will cause its inputs (and hence the transform itself) to be applied with that windowing.
pipeline: type: chain transforms: - type: ReadFromPubSub config: topic: myPubSubTopic format: ... schema: ... - type: SomeAggregation windowing: type: sliding size: 60 period: 10 - type: WriteToPubSub config: topic: anotherPubSubTopic format: json
Note that the Sql
operation itself is often a from of aggregation, and applying a windowing (or consuming an already windowed input) will cause all grouping to be done per window.
pipeline: type: chain transforms: - type: ReadFromPubSub config: topic: myPubSubTopic format: ... schema: ... - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" windowing: type: sessions gap: 60 - type: WriteToPubSub config: topic: anotherPubSubTopic format: json
The specified windowing is applied to all inputs, in this case resulting in a join per window.
pipeline: transforms: - type: ReadFromPubSub name: ReadLeft config: topic: leftTopic format: ... schema: ... - type: ReadFromPubSub name: ReadRight config: topic: rightTopic format: ... schema: ... - type: Sql config: query: select left.col1, right.col2 from left join right using (col3) input: left: ReadLeft right: ReadRight windowing: type: fixed size: 60
For a transform with no inputs, the specified windowing is instead applied to its output(s). As per the Beam model, the windowing is then inherited by all consuming operations. This is especially useful for root operations like Read.
pipeline: type: chain transforms: - type: ReadFromPubSub config: topic: myPubSubTopic format: ... schema: ... windowing: type: fixed size: 60 - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" - type: WriteToPubSub config: topic: anotherPubSubTopic format: json
One can also specify windowing at the top level of a pipeline (or composite), which is a shorthand to simply applying this same windowing to all root operations (that don't otherwise specify their own windowing), and can be an effective way to apply it everywhere.
pipeline: type: chain transforms: - type: ReadFromPubSub config: topic: myPubSubTopic format: ... schema: ... - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" - type: WriteToPubSub config: topic: anotherPubSubTopic format: json windowing: type: fixed size: 60
Note that all these windowing specifications are compatible with the source
and sink
syntax as well
pipeline: type: chain source: type: ReadFromPubSub config: topic: myPubSubTopic format: ... schema: ... windowing: type: fixed size: 10 transforms: - type: Sql config: query: "select col1, count(*) as c from PCOLLECTION" sink: type: WriteToCsv config: path: /path/to/output.json windowing: type: fixed size: 300
Though we aim to offer a large suite of built-in transforms, it is inevitable that people will want to be able to author their own. This is made possible through the notion of Providers which leverage expansion services and schema transforms.
For example, one could build a jar that vends a cross language transform or schema transform and then use it in a transform as follows
pipeline: type: chain source: type: ReadFromCsv config: path: /path/to/input*.csv transforms: - type: MyCustomTransform config: arg: whatever sink: type: WriteToJson config: path: /path/to/output.json providers: - type: javaJar jar: /path/or/url/to/myExpansionService.jar transforms: MyCustomTransform: "urn:registered:in:expansion:service"
Arbitrary Python transforms can be provided as well, using the syntax
providers: - type: pythonPackage packages: - my_pypi_package>=version - /path/to/local/package.zip transforms: MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"
The Beam yaml parser is currently included as part of the Apache Beam Python SDK. This can be installed (e.g. within a virtual environment) as
pip install apache_beam[yaml,gcp]
In addition, several of the provided transforms (such as SQL) are implemented in Java and their expansion will require a working Java interpeter. (The requisite artifacts will be automatically downloaded from the apache maven repositories, so no further installs will be required.) Docker is also currently required for local execution of these cross-language-requiring transforms, but not for submission to a non-local runner such as Flink or Dataflow.
Once the prerequisites are installed, you can execute a pipeline defined in a yaml file as
python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/pipeline.yaml [other pipeline options such as the runner]