blob: cac0c7404a2ad7fca5d4488a3c41d142f975d82d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
name: fixed_default
window_fn: FixedWindows(10) # Python names/syntax, unless otherwise noted.
trigger_fn: Default # Same. Empty () may be omitted.
transcript: # Ordered list of events.
- input: [1, 2, 3, 10, 11] # The elements are the timestamps.
- watermark: 25
- expect: # Every expected output from the last action.
- {window: [0, 9], values: [1, 2, 3], index: 0}
- {window: [10, 19], values: [10, 11]} # Partial match on attributes OK.
---
name: fixed_default_late_data
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 9, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 19}
- {window: [20, 29], values: [25], timestamp: 29, late: false}
- input: [7]
- expect:
- {window: [0, 9], values: [1, 2, 3, 7], timestamp: 9, late: true}
---
name: timestamp_combiner_earliest
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 10}
- {window: [20, 29], values: [25], timestamp: 25, late: false}
---
name: timestamp_combiner_latest
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_LATEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 3, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 11}
- {window: [20, 29], values: [25], timestamp: 25, late: false}
---
# Test that custom timestamping is not invoked.
name: timestamp_combiner_custom_timestamping_eow
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 9, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 19}
- {window: [20, 29], values: [25], timestamp: 29, late: false}
---
# Test that custom timestamping is not invoked.
name: timestamp_combiner_custom_timestamping_earliest
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 1, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 10}
- {window: [20, 29], values: [25], timestamp: 25, late: false}
---
# Test that custom timestamping is in fact invoked.
name: timestamp_combiner_custom_timestamping_earliest
broken_on:
- SwitchingDirectRunner # unsupported OUTPUT_AT_EARLIEST_TRANSFORMED
window_fn: CustomTimestampingFixedWindowsWindowFn(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EARLIEST_TRANSFORMED
transcript:
- input: [1, 2, 3, 10, 11, 25]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 101, final: false}
- {window: [10, 19], values: [10, 11], timestamp: 110}
- {window: [20, 29], values: [25], timestamp: 125, late: false}
---
name: early_late_sessions
broken_on:
# Watermark regresses, causing what should be late data to not be late.
- SwitchingDirectRunner
window_fn: Sessions(10)
trigger_fn: AfterWatermark(early=AfterCount(2), late=AfterCount(3))
timestamp_combiner: OUTPUT_AT_EOW
transcript:
- input: [1, 2, 3]
- expect:
- {window: [1, 12], values: [1, 2, 3], timestamp: 12, early: true, index: 0}
- input: [4] # no output
- input: [5]
- expect:
- {window: [1, 14], values: [1, 2, 3, 4, 5], timestamp: 14, early: true, index: 1}
- input: [6]
- watermark: 100
- expect:
- {window: [1, 15], values:[1, 2, 3, 4, 5, 6], timestamp: 15,
index: 2, nonspeculative_index: 0}
- input: [1]
- input: [3, 4]
- expect:
- {window: [1, 15], values: [1, 1, 2, 3, 3, 4, 4, 5, 6], timestamp: 15,
final: false, index: 3, nonspeculative_index: 1}
---
name: discarding_early_fixed
window_fn: FixedWindows(10)
trigger_fn: AfterWatermark(early=AfterCount(2))
timestamp_combiner: OUTPUT_AT_EOW
accumulation_mode: discarding
transcript:
- input: [1, 2, 3]
- expect:
- {window: [0, 9], values: [1, 2, 3], timestamp: 9, early: true, index: 0}
- input: [4] # no output
- input: [14] # no output
- input: [5]
- expect:
- {window: [0, 9], values: [4, 5], timestamp: 9, early: true, index: 1}
- input: [18]
- expect:
- {window: [10, 19], values: [14, 18], timestamp: 19, early: true, index: 0}
- input: [6]
- watermark: 100
- expect:
- {window: [0, 9], values:[6], timestamp: 9, early: false, late: false,
final: true, index: 2, nonspeculative_index: 0}
- {window: [10, 19], values:[], timestamp: 19, early: false, late: false,
final: true, index: 1, nonspeculative_index: 0}
---
name: garbage_collection
broken_on:
- SwitchingDirectRunner # claims pipeline stall
window_fn: FixedWindows(10)
trigger_fn: AfterCount(2)
timestamp_combiner: OUTPUT_AT_EOW
allowed_lateness: 10
accumulation_mode: discarding
transcript:
- input: [1, 2, 3, 10, 11, 25]
- expect:
- {window: [0, 9], timestamp: 9}
- {window: [10, 19], timestamp: 19}
- state:
present: [[20, 29]]
absent: [[0, 9]]
tombstone: [[10, 19]]
---
name: known_late_data_watermark
broken_on:
- SwitchingDirectRunner # bad timestamp
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- watermark: 5
- input: [2, 3, 7, 8]
- watermark: 11
- expect:
- {window: [0, 9], values: [2, 3, 7, 8], timestamp: 7}
---
name: known_late_data_no_watermark_hold_possible
broken_on:
- SwitchingDirectRunner # bad timestamp
window_fn: FixedWindows(10)
trigger_fn: Default
timestamp_combiner: OUTPUT_AT_EARLIEST
transcript:
- watermark: 8
- input: [2, 3, 7]
- watermark: 11
- expect:
- {window: [0, 9], values: [2, 3, 7], timestamp: 9}
# These next examples test that bad/incomplete transcripts are rejected.
---
name: bad_output
error: Unmatched output
windowfn: FixedWindows(10)
transcript:
- input: [1, 2, 3]
- expect:
- {window: [0, 9], values: [1, 2, 3]} # bad
- watermark: 100
---
name: bad_expected_values
error: Unmatched output
window_fn: FixedWindows(10)
transcript:
- input: [1, 2, 3]
- watermark: 100
- expect:
- {window: [0, 9], values: [1, 2]} # bad values
---
name: bad_expected_window
error: Unmatched output
window_fn: FixedWindows(10)
transcript:
- input: [1, 2, 3]
- watermark: 100
- expect:
- {window: [0, 19], values: [1, 2, 3]} # bad window
---
name: missing_output
error: Unexpected output
window_fn: FixedWindows(10)
transcript:
- input: [1, 2, 3]
- watermark: 100
# missing output
- watermark: 200
---
name: missing_output_at_end
error: Unexpected output
window_fn: FixedWindows(10)
transcript:
- input: [1, 2, 3]
- watermark: 100
# missing output