blob: e760b691e13d6e6aae007a5490e3062e9d8189c9 [file] [log] [blame] [view]
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Beam YAML mappings
Beam YAML has the ability to do simple transformations which can be used to
get data into the correct shape. The simplest of these is `MaptoFields`
which creates records with new fields defined in terms of the input fields.
## Field renames
To rename fields one can write
```
- type: MapToFields
config:
fields:
new_col1: col1
new_col2: col2
```
will result in an output where each record has two fields,
`new_col1` and `new_col2`, whose values are those of `col1` and `col2`
respectively.
One can specify the append parameter which indicates the original fields should
be retained similar to the use of `*` in an SQL select statement. For example
```
- type: MapToFields
config:
append: true
fields:
new_col1: col1
new_col2: col2
```
will output records that have `new_col1` and `new_col2` as *additional*
fields. When the append field is specified, one can drop fields as well, e.g.
```
- type: MapToFields
config:
append: true
drop:
- col3
fields:
new_col1: col1
new_col2: col2
```
which includes all original fiels *except* col3 in addition to outputting the
two new ones.
## Mapping functions
Of course one may want to do transformations beyond just dropping and renaming
fields. Beam YAML has the ability to inline simple UDFs.
This requires a language specification. For example
```
- type: MapToFields
config:
language: python
fields:
new_col: "col1.upper()"
another_col: "col2 + col3"
```
In addition, one can provide a full Python callable that takes the row as an
argument to do more complex mappings
(see [PythonCallableSource](https://beam.apache.org/releases/pydoc/current/apache_beam.utils.python_callable.html#apache_beam.utils.python_callable.PythonCallableWithSource)
for acceptable formats). Thus one can write
```
- type: MapToFields
config:
language: python
fields:
new_col:
callable: |
import re
def my_mapping(row):
if re.match("[0-9]+", row.col1) and row.col2 > 0:
return "good"
else:
return "bad"
```
Once one reaches a certain level of complexity, it may be preferable to package
this up as a dependency and simply refer to it by fully qualified name, e.g.
```
- type: MapToFields
config:
language: python
fields:
new_col:
callable: pkg.module.fn
```
Currently, in addition to Python, SQL expressions are supported as well
```
- type: MapToFields
config:
language: sql
fields:
new_col: "UPPER(col1)"
another_col: "col2 + col3"
```
## FlatMap
Sometimes it may be desirable to emit more (or less) than one record for each
input record. This can be accomplished by mapping to an iterable type and
following the mapping with an Explode operation, e.g.
```
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "col2 + col3"
- type: Explode
config:
fields: new_col
```
will result in three output records for every input record.
If more than one record is to be exploded, one must specify whether the cross
product over all fields should be taken. For example
```
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: true
```
will emit nine records whereas
```
- type: MapToFields
config:
language: python
fields:
new_col: "[col1.upper(), col1.lower(), col1.title()]"
another_col: "[col2 - 1, col2, col2 + 1]"
- type: Explode
config:
fields: [new_col, another_col]
cross_product: false
```
will only emit three.
The `Explode` operation can be used on its own if the field in question is
already an iterable type.
```
- type: Explode
config:
fields: [col1]
```
## Filtering
Sometimes it can be desirable to only keep records that satisfy a certain
criteria. This can be accomplished with a `Filter` transform, e.g.
```
- type: Filter
config:
language: sql
keep: "col2 > 0"
```
## Types
Beam will try to infer the types involved in the mappings, but sometimes this
is not possible. In these cases one can explicitly denote the expected output
type, e.g.
```
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
```
The expected type is given in json schema notation, with the addition that
a top-level basic types may be given as a literal string rather than requiring
a `{type: 'basic_type_name'}` nesting.
```
- type: MapToFields
config:
language: python
fields:
new_col:
expression: "col1.upper()"
output_type: string
another_col:
expression: "beam.Row(a=col1, b=[col2])"
output_type:
type: 'object'
properties:
a:
type: 'string'
b:
type: 'array'
items:
type: 'number'
```
This can be especially useful to resolve errors involving the inability to
handle the `beam:logical:pythonsdk_any:v1` type.