tree: 4d5953e17c3e42b9ccb5e6ac117ab3921a12b400 [path history] [tgz]
  1. __init__.py
  2. main.py
  3. pipeline.schema.yaml
  4. README.md
  5. readme_test.py
  6. standard_providers.yaml
  7. yaml_provider.py
  8. yaml_transform.py
  9. yaml_transform_test.py
sdks/python/apache_beam/yaml/README.md

Beam YAML API

While Beam provides powerful APIs for authoring sophisticated data processing pipelines, it often still has too high a barrier for getting started and authoring simple pipelines. Even setting up the environment, installing the dependencies, and setting up the project can be an overwhelming amount of boilerplate for some (though https://beam.apache.org/blog/beam-starter-projects/ has gone a long way in making this easier).

Here we provide a simple declarative syntax for describing pipelines that does not require coding experience or learning how to use an SDK—any text editor will do. Some installation may be required to actually execute a pipeline, but we envision various services (such as Dataflow) to accept yaml pipelines directly obviating the need for even that in the future. We also anticipate the ability to generate code directly from these higher-level yaml descriptions, should one want to graduate to a full Beam SDK (and possibly the other direction as well as far as possible).

Though we intend this syntax to be easily authored (and read) directly by humans, this may also prove a useful intermediate representation for tools to use as well, either as output (e.g. a pipeline authoring GUI) or consumption (e.g. a lineage analysis tool) and expect it to be more easily manipulated and semantically meaningful than the Beam protos themselves (which concern themselves more with execution).

It should be noted that everything here is still EXPERIMENTAL and subject to change. Feedback is welcome at dev@apache.beam.org.

Example pipelines

Here is a simple pipeline that reads some data from csv files and writes it out in json format.

pipeline:
  transforms:
    - type: ReadFromCsv
      path: /path/to/input*.csv
    - type: WriteToJson
      path: /path/to/output.json
      input: ReadFromCsv

We can also add a transformation

pipeline:
  transforms:
    - type: ReadFromCsv
      path: /path/to/input*.csv
    - type: PyFilter
      keep: "lambda x: x.col3 > 100"
      input: ReadFromCsv
    - type: WriteToJson
      path: /path/to/output.json
      input: PyFilter

or two.

pipeline:
  transforms:
    - type: ReadFromCsv
      path: /path/to/input*.csv
    - type: PyFilter
      keep: "lambda x: x.col3 > 100"
      input: ReadFromCsv
    - type: Sql
      name: MySqlTransform
      query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
      input: PyFilter
    - type: WriteToJson
      path: /path/to/output.json
      input: MySqlTransform

If the pipeline is linear, we can let the inputs be implicit by designating the pipeline as a chain type.

pipeline:
  type: chain

  transforms:
    - type: ReadFromCsv
      path: /path/to/input*.csv
    - type: PyFilter
      keep: "lambda x: x.col3 > 100"
    - type: Sql
      name: MySqlTransform
      query: "select col1, count(*) as cnt from PCOLLECTION group by col1"
    - type: WriteToJson
      path: /path/to/output.json

As syntactic sugar, we can name the first and last transforms in our pipeline as source and sink.

pipeline:
  type: chain

  source:
    type: ReadFromCsv
    path: /path/to/input*.csv

  transforms:
    - type: PyFilter
      keep: "lambda x: x.col3 > 100"

    - type: Sql
      name: MySqlTransform
      query: "select col1, count(*) as cnt from PCOLLECTION group by col1"

  sink:
    type: WriteToJson
    path: /path/to/output.json

Arbitrary non-linear pipelines are supported as well, though in this case inputs must be explicitly named. Here we read two sources, join them, and write two outputs.

pipeline:
  - type: ReadFromCsv
    name: ReadLeft
    path: /path/to/left*.csv

  - type: ReadFromCsv
    name: ReadRight
    path: /path/to/right*.csv

  - type: Sql
    query: select left.col1, right.col2 from left join right using (col3)
    input:
      left: ReadLeft
      right: ReadRight

  - type: WriteToJson
    name: WriteAll
    input: Sql
    path: /path/to/all.json

  - type: PyFilter
    name: FilterToBig
    input: Sql
    keep: "lambda x: x.col2 > 100"

  - type: WriteToCsv
    name: WriteBig
    input: FilterToBig
    path: /path/to/big.csv

One can, however, nest chains within a non-linear pipeline. For example, here ExtraProcessingForBigRows is itself a “chain” transform that has a single input and contains its own sink.

pipeline:
  - type: ReadFromCsv
    name: ReadLeft
    path: /path/to/left*.csv

  - type: ReadFromCsv
    name: ReadRight
    path: /path/to/right*.csv

  - type: Sql
    query: select left.col1, right.col2 from left join right using (col3)
    input:
      left: ReadLeft
      right: ReadRight

  - type: WriteToJson
    name: WriteAll
    input: Sql
    path: /path/to/all.json

  - type: chain
    name: ExtraProcessingForBigRows
    input: Sql
    transforms:
      - type: PyFilter
        keep: "lambda x: x.col2 > 100"
      - type: PyFilter
        keep: "lambda x: len(x.col1) > 10"
      - type: PyFilter
        keep: "lambda x: x.col1 > 'z'"
    sink:
      type: WriteToCsv
      path: /path/to/big.csv

Windowing

This API can be used to define both streaming and batch pipelines. In order to meaningfully aggregate elements in a streaming pipeline, some kind of windowing is typically required. Beam's windowing and triggering can be be declared using the same WindowInto transform available in all other SDKs.

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      topic: myPubSubTopic
    - type: WindowInto
      windowing:
        type: fixed
        size: 60
    - type: SomeAggregation
    - type: WriteToPubSub
      topic: anotherPubSubTopic

Rather than using an explicit WindowInto operation, one may instead tag a transform itself with a specified windowing which will cause its inputs (and hence the transform itself) to be applied with that windowing.

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      topic: myPubSubTopic
    - type: SomeAggregation
      windowing:
        type: sliding
        size: 60
        period: 10
    - type: WriteToPubSub
      topic: anotherPubSubTopic

Note that the Sql operation itself is often a from of aggregation, and applying a windowing (or consuming an already windowed input) will cause all grouping to be done per window.

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      topic: myPubSubTopic
    - type: Sql
      query: "select col1, count(*) as c from PCOLLECTION"
      windowing:
        type: sessions
        gap: 60
    - type: WriteToPubSub
      topic: anotherPubSubTopic

The specified windowing is applied to all inputs, in this case resulting in a join per window.

pipeline:
  - type: ReadFromPubSub
    name: ReadLeft
    topic: leftTopic

  - type: ReadFromPubSub
    name: ReadRight
    topic: rightTopic

  - type: Sql
    query: select left.col1, right.col2 from left join right using (col3)
    input:
      left: ReadLeft
      right: ReadRight
    windowing:
      type: fixed
      size: 60

For a transform with no inputs, the specified windowing is instead applied to its output(s). As per the Beam model, the windowing is then inherited by all consuming operations. This is especially useful for root operations like Read.

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      topic: myPubSubTopic
      windowing:
        type: fixed
        size: 60
    - type: Sql
      query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      topic: anotherPubSubTopic

One can also specify windowing at the top level of a pipeline (or composite), which is a shorthand to simply applying this same windowing to all root operations (that don't otherwise specify their own windowing), and can be an effective way to apply it everywhere.

pipeline:
  type: chain
  transforms:
    - type: ReadFromPubSub
      topic: myPubSubTopic
    - type: Sql
      query: "select col1, count(*) as c from PCOLLECTION"
    - type: WriteToPubSub
      topic: anotherPubSubTopic
  windowing:
    type: fixed
    size: 60

Note that all these windowing specifications are compatible with the source and sink syntax as well

pipeline:
  type: chain

  source:
    type: ReadFromPubSub
    topic: myPubSubTopic
    windowing:
      type: fixed
      size: 10

  transforms:
    - type: Sql
      query: "select col1, count(*) as c from PCOLLECTION"

  sink:
    type: WriteToCsv
    path: /path/to/output.json
    windowing:
      type: fixed
      size: 300

Providers

Though we aim to offer a large suite of built-in transforms, it is inevitable that people will want to be able to author their own. This is made possible through the notion of Providers which leverage expansion services and schema transforms.

For example, one could build a jar that vends a cross language transform or schema transform and then use it in a transform as follows

pipeline:
  type: chain
  source:
    type: ReadFromCsv
    path: /path/to/input*.csv

  transforms:
    - type: MyCustomTransform
      args:
        arg: whatever

  sink:
    type: WriteToJson
    path: /path/to/output.json

providers:
  - type: javaJar
    jar: /path/or/url/to/myExpansionService.jar
    transforms:
       MyCustomTransform: "urn:registered:in:expansion:service"

Arbitrary Python transforms can be provided as well, using the syntax

providers:
  - type: pythonPackage
    packages:
        - my_pypi_package>=version
        - /path/to/local/package.zip
    transforms:
       MyCustomTransform: "pkg.subpkg.PTransformClassOrCallable"

Running pipelines

The Beam yaml parser is currently included as part of the Apache Beam Python SDK. This can be installed (e.g. within a virtual environment) as

pip install apache_beam

In addition, several of the provided transforms (such as SQL) are implemented in Java and their expansion will require a working Java interpeter. (The requisite artifacts will be automatically downloaded from the apache maven repositories, so no further installs will be required.) Docker is also currently required for local execution of these cross-language-requiring transforms, but not for submission to a non-local runner such as Flink or Dataflow.

Once the prerequisites are installed, you can execute a pipeline defined in a yaml file as

python -m apache_beam.yaml.main --pipeline_spec_file=/path/to/pipeline.yaml [other pipeline options such as the runner]