Beam YAML has EXPERIMENTAL ability to do aggregations to group and combine values across records. The is accomplished via the Combine
transform type. Currently Combine
needs to be in the yaml_experimental_features
option to use this transform.
For example, one can write
- type: Combine config: group_by: col1 combine: total: value: col2 fn: type: sum
If the function has no configuration requirements, it can be provided directly as a string
- type: Combine config: group_by: col1 combine: total: value: col2 fn: sum
This can be simplified further if the output field name is the same as the input field name
- type: Combine config: group_by: col1 combine: col2: sum
One can aggregate over may fields at once
- type: Combine config: group_by: col1 combine: col2: sum col3: max
and/or group by more than one field
- type: Combine config: group_by: [col1, col2] combine: col3: sum
or none at all (which will result in a global combine with a single output)
- type: Combine config: group_by: [] combine: col2: sum col3: max
As with all transforms, Combine
can take a windowing parameter
- type: Combine windowing: type: fixed size: 60 config: group_by: col1 combine: col2: sum col3: max
If no windowing specification is provided, it inherits the windowing parameters from upstream, e.g.
- type: WindowInto windowing: type: fixed size: 60 - type: Combine config: group_by: col1 combine: col2: sum col3: max
is equivalent to the previous example.
One can use aggregation functions defined in Python by setting the language parameter.
- type: Combine config: language: python group_by: col1 combine: biggest: value: "col2 + col2" fn: type: 'apache_beam.transforms.combiners.TopCombineFn' config: n: 10
By setting the language to SQL, one can provide full SQL snippets as the combine fn.
- type: Combine config: language: sql group_by: col1 combine: num_values: "count(*)" total: "sum(col2)"
One can of course also use the Sql
transform type and provide a query directly.