blob: e951711022f78e82ea4f79caff11f8895c3a56d5 [file] [view]
---
type: languages
title: "Apache Beam YAML Inline Python"
---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
# Using PyTransform from YAML
Beam YAML provides the ability to easily invoke Python transforms via the
`PyTransform` type, simply referencing them by fully qualified name.
For example,
```
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
```
will invoke the transform `apache_beam.pkg.mod.SomeTransform(1, 'foo', baz=3)`.
This fully qualified name can be any PTransform class or other callable that
returns a PTransform. Note, however, that PTransforms that do not accept or
return schema'd data may not be as useable to use from YAML.
Restoring the schema-ness after a non-schema returning transform can be done
by using the `callable` option on `MapToFields` which takes the entire element
as an input, e.g.
```
- type: PyTransform
config:
constructor: apache_beam.pkg.module.SomeTransform
args: [1, 'foo']
kwargs:
baz: 3
- type: MapToFields
config:
language: python
fields:
col1:
callable: 'lambda element: element.col1'
output_type: string
col2:
callable: 'lambda element: element.col2'
output_type: integer
```
This can be used to call arbitrary transforms in the Beam SDK, e.g.
```
pipeline:
transforms:
- type: PyTransform
name: ReadFromTsv
input: {}
config:
constructor: apache_beam.io.ReadFromCsv
kwargs:
path: '/path/to/*.tsv'
sep: '\t'
skip_blank_lines: True
true_values: ['yes']
false_values: ['no']
comment: '#'
on_bad_lines: 'skip'
binary: False
splittable: False
```
## Defining a transform inline using `__constructor__`
If the desired transform does not exist, one can define it inline as well.
This is done with the special `__constructor__` keywords,
similar to how cross-language transforms are done.
With the `__constuctor__` keyword, one defines a Python callable that, on
invocation, *returns* the desired transform. The first argument (or `source`
keyword argument, if there are no positional arguments)
is interpreted as the Python code. For example
```
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
def create_my_transform(inc):
return beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
```
will apply `beam.Map(lambda x: beam.Row(a=x.col2 + 10))` to the incoming
PCollection.
As a class object can be invoked as its own constructor, this allows one to
define a `beam.PTransform` inline, e.g.
```
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
class MyPTransform(beam.PTransform):
def __init__(self, inc):
self._inc = inc
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + self._inc))
inc: 10
```
which works exactly as one would expect.
## Defining a transform inline using `__callable__`
The `__callable__` keyword works similarly, but instead of defining a
callable that returns an applicable `PTransform` one simply defines the
expansion to be performed as a callable. This is analogous to BeamPython's
`ptransform.ptransform_fn` decorator.
In this case one can simply write
```
- type: PyTransform
config:
constructor: __callable__
kwargs:
source: |
def my_ptransform(pcoll, inc):
return pcoll | beam.Map(lambda x: beam.Row(a=x.col2 + inc))
inc: 10
```
# External transforms
One can also invoke PTransforms define elsewhere via a `python` provider,
for example
```
pipeline:
transforms:
- ...
- type: MyTransform
config:
kwarg: whatever
providers:
- ...
- type: python
input: ...
config:
packages:
- 'some_pypi_package>=version'
transforms:
MyTransform: 'pkg.module.MyTransform'
```
These can be defined inline as well, with or without dependencies, e.g.
```
pipeline:
transforms:
- ...
- type: ToCase
input: ...
config:
upper: True
providers:
- type: python
config: {}
transforms:
'ToCase': |
@beam.ptransform_fn
def ToCase(pcoll, upper):
if upper:
return pcoll | beam.Map(lambda x: str(x).upper())
else:
return pcoll | beam.Map(lambda x: str(x).lower())
```