blob: 45233e2039e595f82d39ce8bcc54461e2a2f0c6b [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
pipelines:
# Simply extract windowing info with no fields resulting in default fields
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
timestamp:
callable: "lambda x: str(x.timestamp)"
window_start:
callable: "lambda x: str(x.window_start)"
window_end:
callable: "lambda x: str(x.window_end)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
- {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
- {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000)}
# Simply extract windowing info with all available fields
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
config:
fields: [timestamp, window_start, window_end, window_string, window_type, window_object, pane_info]
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
timestamp:
callable: "lambda x: str(x.timestamp)"
window_start:
callable: "lambda x: str(x.window_start)"
window_end:
callable: "lambda x: str(x.window_end)"
window_string:
callable: "lambda x: str(x.window_string)"
window_type:
callable: "lambda x: str(x.window_type)"
window_object:
callable: "lambda x: str(x.window_object)"
pane_info:
callable: "lambda x: str(x.pane_info)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "37a", rank: 1, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "389a", rank: 2, timestamp: Timestamp(-9223372036854.775000), window_start: Timestamp(-9223372036854.775000), window_end: Timestamp(9223371950454.775000), window_string: 'GlobalWindow', window_type: 'GlobalWindow', window_object: 'GlobalWindow', pane_info: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
# Simply extract windowing info with a few fields renamed
- pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {label: "11a", rank: 0}
- {label: "37a", rank: 1}
- {label: "389a", rank: 2}
- type: ExtractWindowingInfo
config:
fields:
ts: timestamp
ws: window_start
pane: pane_info
- type: MapToFields
config:
language: python
fields:
label:
callable: "lambda x: x.label"
rank:
callable: "lambda x: x.rank"
ts:
callable: "lambda x: str(x.ts)"
ws:
callable: "lambda x: str(x.ws)"
pane:
callable: "lambda x: str(x.pane)"
- type: AssertEqual
config:
elements:
- {label: "11a", rank: 0, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "37a", rank: 1, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}
- {label: "389a", rank: 2, ts: Timestamp(-9223372036854.775000), ws: Timestamp(-9223372036854.775000), pane: "PaneInfoTuple(is_first=True, is_last=True, timing='UNKNOWN', index=0, nonspeculative_index=0)"}