blob: 241a16fd7c609abcffbece457007acd2cc887b01 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
fixtures:
- name: TEMP_DIR
type: "tempfile.TemporaryDirectory"
pipelines:
# Simple PyTransform using __constructor__
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
def increase(inc):
return beam.Map(lambda x: beam.Row(price=x.price + inc, produce=x.produce))
inc: 0.30
- type: AssertEqual
config:
elements:
- {price: 1.59, produce: 'Apple'}
- {price: 0.59, produce: 'Apricot'}
- {price: 0.32, produce: 'Blueberry'}
- {price: 0.49, produce: 'Date'}
# Simple PyTransform using __constructor__ beam.PTransform
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}
- type: PyTransform
config:
constructor: __constructor__
kwargs:
source: |
class MyPTransform(beam.PTransform):
def __init__(self, inc):
self._inc = inc
def expand(self, pcoll):
return pcoll | beam.Map(lambda x: beam.Row(price=x.price + self._inc, produce=x.produce))
inc: 0.30
- type: AssertEqual
config:
elements:
- {price: 1.59, produce: 'Apple'}
- {price: 0.59, produce: 'Apricot'}
- {price: 0.32, produce: 'Blueberry'}
- {price: 0.49, produce: 'Date'}
# Simple PyTransform using __callable__
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}
- type: PyTransform
config:
constructor: __callable__
kwargs:
source: |
def increase(pcoll, inc):
return pcoll | beam.Map(lambda x: beam.Row(price=x.price + inc, produce=x.produce))
inc: 0.30
- type: AssertEqual
config:
elements:
- {price: 1.59, produce: 'Apple'}
- {price: 0.59, produce: 'Apricot'}
- {price: 0.32, produce: 'Blueberry'}
- {price: 0.49, produce: 'Date'}
# Create Csv for pytransform read in next pipeline
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}
- type: WriteToCsv
config:
path: "{TEMP_DIR}/out.csv"
# Simple PyTransform using ReadFromCsv with args
- pipeline:
type: composite
transforms:
- type: PyTransform
name: PyTransformReadFromCsv
input: {}
config:
constructor: apache_beam.io.ReadFromCsv
args: ['{TEMP_DIR}/out.csv*']
- type: AssertEqual
input: PyTransformReadFromCsv
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}
# Simple PyTransform using ReadFromCsv with kwargs
- pipeline:
type: composite
transforms:
- type: PyTransform
name: PyTransformReadFromCsv
input: {}
config:
constructor: apache_beam.io.ReadFromCsv
kwargs:
path: '{TEMP_DIR}/out.csv*'
- type: AssertEqual
input: PyTransformReadFromCsv
config:
elements:
- {price: 1.29, produce: 'Apple'}
- {price: 0.29, produce: 'Apricot'}
- {price: 0.02, produce: 'Blueberry'}
- {price: 0.19, produce: 'Date'}