blob: 13e56f22edb526bdf6c69333fe93bbaa7336f21f [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pipelines:
# Assign timestamp to beam row element
- pipeline:
type: chain
transforms:
- type: Create
name: CreateVisits
config:
elements:
- user: alice
new_time_stamp: 1
- user: alice
new_time_stamp: 3
- user: bob
new_time_stamp: 7
- type: AssignTimestamps
config:
timestamp: new_time_stamp
- type: ExtractWindowingInfo
config:
fields: [timestamp]
- type: MapToFields
config:
language: python
fields:
user: user
timestamp: timestamp
- type: AssertEqual
config:
elements:
- {user: "alice", timestamp: 1}
- {user: "alice", timestamp: 3}
- {user: "bob", timestamp: 7}
# Assign timestamp to beam row element with error_handling
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
- type: MapToFields
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: MapToFields
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- type: AssertEqual
input: AssignTimestamps
config:
elements:
- {user: bob, timestamp: 3}
# Assign timestamp to beam row element with error handling and output schema
# check.
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
output_schema:
type: object
properties:
user:
type: string
timestamp:
type: integer
- type: MapToFields
name: ExtractInvalidTimestamp
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: ExtractInvalidTimestamp
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- type: AssertEqual
input: AssignTimestamps
config:
elements:
- {user: bob, timestamp: 3}
# Assign timestamp to beam row element with error handling and output schema
# check with more error handling.
- pipeline:
type: composite
transforms:
- type: Create
name: CreateVisits
config:
elements:
- {user: alice, timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssignTimestamps
input: CreateVisits
config:
timestamp: timestamp
error_handling:
output: invalid_rows
output_schema:
type: object
properties:
user:
type: string
timestamp:
type: boolean
- type: MapToFields
name: ExtractInvalidTimestamp
input: AssignTimestamps.invalid_rows
config:
language: python
fields:
user: "element.user"
timestamp: "element.timestamp"
- type: AssertEqual
input: ExtractInvalidTimestamp
config:
elements:
- {user: "alice", timestamp: "not-valid"}
- {user: bob, timestamp: 3}
- type: AssertEqual
input: AssignTimestamps
config:
elements: []