| # |
| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| pipelines: |
| # Assign timestamp to beam row element |
| - pipeline: |
| type: chain |
| transforms: |
| - type: Create |
| name: CreateVisits |
| config: |
| elements: |
| - user: alice |
| new_time_stamp: 1 |
| - user: alice |
| new_time_stamp: 3 |
| - user: bob |
| new_time_stamp: 7 |
| - type: AssignTimestamps |
| config: |
| timestamp: new_time_stamp |
| - type: ExtractWindowingInfo |
| config: |
| fields: [timestamp] |
| - type: MapToFields |
| config: |
| language: python |
| fields: |
| user: user |
| timestamp: timestamp |
| - type: AssertEqual |
| config: |
| elements: |
| - {user: "alice", timestamp: 1} |
| - {user: "alice", timestamp: 3} |
| - {user: "bob", timestamp: 7} |
| |
| # Assign timestamp to beam row element with error_handling |
| - pipeline: |
| type: composite |
| transforms: |
| - type: Create |
| name: CreateVisits |
| config: |
| elements: |
| - {user: alice, timestamp: "not-valid"} |
| - {user: bob, timestamp: 3} |
| - type: AssignTimestamps |
| input: CreateVisits |
| config: |
| timestamp: timestamp |
| error_handling: |
| output: invalid_rows |
| - type: MapToFields |
| input: AssignTimestamps.invalid_rows |
| config: |
| language: python |
| fields: |
| user: "element.user" |
| timestamp: "element.timestamp" |
| - type: AssertEqual |
| input: MapToFields |
| config: |
| elements: |
| - {user: "alice", timestamp: "not-valid"} |
| - type: AssertEqual |
| input: AssignTimestamps |
| config: |
| elements: |
| - {user: bob, timestamp: 3} |
| |
| # Assign timestamp to beam row element with error handling and output schema |
| # check. |
| - pipeline: |
| type: composite |
| transforms: |
| - type: Create |
| name: CreateVisits |
| config: |
| elements: |
| - {user: alice, timestamp: "not-valid"} |
| - {user: bob, timestamp: 3} |
| - type: AssignTimestamps |
| input: CreateVisits |
| config: |
| timestamp: timestamp |
| error_handling: |
| output: invalid_rows |
| output_schema: |
| type: object |
| properties: |
| user: |
| type: string |
| timestamp: |
| type: integer |
| - type: MapToFields |
| name: ExtractInvalidTimestamp |
| input: AssignTimestamps.invalid_rows |
| config: |
| language: python |
| fields: |
| user: "element.user" |
| timestamp: "element.timestamp" |
| - type: AssertEqual |
| input: ExtractInvalidTimestamp |
| config: |
| elements: |
| - {user: "alice", timestamp: "not-valid"} |
| - type: AssertEqual |
| input: AssignTimestamps |
| config: |
| elements: |
| - {user: bob, timestamp: 3} |
| |
| # Assign timestamp to beam row element with error handling and output schema |
| # check with more error handling. |
| - pipeline: |
| type: composite |
| transforms: |
| - type: Create |
| name: CreateVisits |
| config: |
| elements: |
| - {user: alice, timestamp: "not-valid"} |
| - {user: bob, timestamp: 3} |
| - type: AssignTimestamps |
| input: CreateVisits |
| config: |
| timestamp: timestamp |
| error_handling: |
| output: invalid_rows |
| output_schema: |
| type: object |
| properties: |
| user: |
| type: string |
| timestamp: |
| type: boolean |
| - type: MapToFields |
| name: ExtractInvalidTimestamp |
| input: AssignTimestamps.invalid_rows |
| config: |
| language: python |
| fields: |
| user: "element.user" |
| timestamp: "element.timestamp" |
| - type: AssertEqual |
| input: ExtractInvalidTimestamp |
| config: |
| elements: |
| - {user: "alice", timestamp: "not-valid"} |
| - {user: bob, timestamp: 3} |
| - type: AssertEqual |
| input: AssignTimestamps |
| config: |
| elements: [] |
| |