blob: 1df599e254a7fd78135f4da122f8d5fadf8eeb15 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pipelines:
# Validate a Beam Row with a predefined schema with no error handling
- pipeline:
type: chain
transforms:
- type: Create
name: InputData
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25, score: 88.0}
- type: ValidateWithSchema
config:
schema:
type: object
properties:
name:
type: string
age:
type: integer
score:
type: number
- type: AssertEqual
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25, score: 88.0}
# Validate a Beam Row with a predefined schema with error handling
- pipeline:
type: composite
transforms:
- type: Create
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25, score: 88.0}
- {name: "Charlie", age: 27, score: "apple"}
- {name: "David", age: "twenty", score: 90.0}
- {name: 30, age: 40, score: 100.0}
- type: ValidateWithSchema
input: Create
config:
schema:
type: object
properties:
name:
type: string
age:
type: integer
score:
type: number
required: [name, age, score]
error_handling:
output: invalid_rows
- type: MapToFields
input: ValidateWithSchema.invalid_rows
config:
language: python
fields:
name: "element.name"
age: "element.age"
score: "element.score"
- type: AssertEqual
input: MapToFields
config:
elements:
- {name: "Charlie", age: 27, score: "apple"}
- {name: "David", age: "twenty", score: 90.0}
- {name: 30, age: 40, score: 100.0}
- type: AssertEqual
input: ValidateWithSchema
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25, score: 88.0}
# Validate a Beam Row with a predefined schema, nulls, and error handling
- pipeline:
type: composite
transforms:
- type: Create
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25}
- {name: "Charlie", age: 27, score: "apple"}
- type: ValidateWithSchema
input: Create
config:
schema:
type: object
properties:
name:
type: string
age:
type: integer
score:
type: number
error_handling:
output: invalid_rows
# ValidateWithSchema outputs the element, error msg, and traceback, so
# MapToFields is needed to easily assert downstream.
- type: MapToFields
input: ValidateWithSchema.invalid_rows
config:
language: python
fields:
name: "element.name"
age: "element.age"
score: "element.score"
- type: AssertEqual
input: ValidateWithSchema
config:
elements:
- {name: "Alice", age: 30, score: 95.5}
- {name: "Bob", age: 25}
- type: AssertEqual
input: MapToFields
config:
elements:
- {name: "Charlie", age: 27, score: "apple"}