blob: fc12a1ed0eb38e0376c8b40fbce559f9cb77b8b0 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from typing import Iterable, Tuple, Dict
from pyflink.common import Configuration
from pyflink.common.time import Time
from pyflink.common.typeinfo import Types
from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner
from pyflink.datastream.data_stream import DataStream
from pyflink.datastream.functions import (ProcessWindowFunction, WindowFunction, AggregateFunction,
ProcessAllWindowFunction)
from pyflink.datastream.output_tag import OutputTag
from pyflink.datastream.window import (TumblingEventTimeWindows,
SlidingEventTimeWindows, EventTimeSessionWindows,
CountSlidingWindowAssigner, SessionWindowTimeGapExtractor,
CountWindow, PurgingTrigger, EventTimeTrigger, TimeWindow,
GlobalWindows, CountTrigger)
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.java_gateway import get_gateway
from pyflink.testing.test_case_utils import PyFlinkStreamingTestCase
from pyflink.util.java_utils import get_j_env_configuration
class WindowTests(object):
def setUp(self) -> None:
super(WindowTests, self).setUp()
self.test_sink = DataStreamTestSinkFunction()
def tearDown(self) -> None:
self.test_sink.clear()
def assert_equals_sorted(self, expected, actual):
expected.sort()
actual.sort()
self.assertEqual(expected, actual)
def test_event_time_tumbling_window(self):
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9),
('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_tumbling_window')
results = self.test_sink.get_results()
expected = ['(hi,0,5,4)', '(hi,5,10,3)', '(hi,15,20,1)']
self.assert_equals_sorted(expected, results)
def test_count_tumbling_window(self):
data_stream: DataStream = self.env.from_collection([
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'),
(6, 'hello')],
type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
.count_window(3) \
.apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_count_tumbling_window')
results = self.test_sink.get_results()
expected = ['(hi,9)', '(hello,12)']
self.assert_equals_sorted(expected, results)
def test_event_time_sliding_window(self):
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 5), ('hi', 8), ('hi', 9),
('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(SlidingEventTimeWindows.of(Time.milliseconds(5), Time.milliseconds(2))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_sliding_window')
results = self.test_sink.get_results()
expected = ['(hi,-2,3,2)', '(hi,0,5,4)', '(hi,2,7,4)', '(hi,4,9,3)', '(hi,6,11,2)',
'(hi,8,13,2)', '(hi,12,17,1)', '(hi,14,19,1)']
self.assert_equals_sorted(expected, results)
def test_count_sliding_window(self):
data_stream: DataStream = self.env.from_collection([
(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello')],
type_info=Types.TUPLE([Types.INT(), Types.STRING()]))
data_stream.key_by(lambda x: x[1], key_type=Types.STRING()) \
.window(CountSlidingWindowAssigner(2, 1)) \
.apply(SumWindowFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_count_sliding_window')
results = self.test_sink.get_results()
expected = ['(hello,6)', '(hi,8)', '(hi,4)', '(hello,10)']
self.assert_equals_sorted(expected, results)
def test_event_time_session_window(self):
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_session_window')
results = self.test_sink.get_results()
expected = ['(hi,1,14,6)', '(hi,15,20,1)']
self.assert_equals_sorted(expected, results)
def test_event_time_dynamic_gap_session_window(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 9), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_dynamic_gap(MySessionWindowTimeGapExtractor())) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_dynamic_gap_session_window')
results = self.test_sink.get_results()
expected = ['(hi,1,8,4)', '(hi,9,30,3)']
self.assert_equals_sorted(expected, results)
def test_window_reduce_passthrough(self):
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.reduce(lambda a, b: (b[0], a[1] + b[1]),
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_window_reduce_passthrough')
results = self.test_sink.get_results()
expected = ['(a,3)', '(a,6)', '(a,15)', '(b,3)', '(b,17)']
self.assert_equals_sorted(expected, results)
def test_window_reduce_process(self):
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyProcessFunction(ProcessWindowFunction):
def process(self, key, context: ProcessWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
yield "current window start at {}, reduce result {}".format(
context.window().start,
next(iter(elements)),
)
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.reduce(lambda a, b: (b[0], a[1] + b[1]),
window_function=MyProcessFunction(),
output_type=Types.STRING()) \
.add_sink(self.test_sink)
self.env.execute('test_window_reduce_process')
results = self.test_sink.get_results()
expected = ["current window start at 1, reduce result ('a', 3)",
"current window start at 15, reduce result ('a', 15)",
"current window start at 3, reduce result ('b', 3)",
"current window start at 6, reduce result ('a', 6)",
"current window start at 8, reduce result ('b', 17)"]
self.assert_equals_sorted(expected, results)
def test_window_aggregate_passthrough(self):
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[str, Dict[int, int]]:
return '', {0: 0, 1: 0}
def add(self, value: Tuple[str, int], accumulator: Tuple[str, Dict[int, int]]
) -> Tuple[str, Dict[int, int]]:
number_map = accumulator[1]
number_map[value[1] % 2] += 1
return value[0], number_map
def get_result(self, accumulator: Tuple[str, Dict[int, int]]) -> Tuple[str, int]:
number_map = accumulator[1]
return accumulator[0], number_map[0] - number_map[1]
def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: Tuple[str, Dict[int, int]]
) -> Tuple[str, Dict[int, int]]:
number_map_a = acc_a[1]
number_map_b = acc_b[1]
new_number_map = {
0: number_map_a[0] + number_map_b[0],
1: number_map_a[1] + number_map_b[1]
}
return acc_a[0], new_number_map
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_window_aggregate_passthrough')
results = self.test_sink.get_results()
expected = ['(a,-1)', '(a,0)', '(a,1)', '(b,-1)', '(b,0)']
self.assert_equals_sorted(expected, results)
def test_window_aggregate_accumulator_type(self):
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[int, str]:
return 0, ''
def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
return value[1] + accumulator[0], value[0]
def get_result(self, accumulator: Tuple[str, int]):
return accumulator[1], accumulator[0]
def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
return acc_a[0] + acc_b[0], acc_a[1]
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_window_aggregate_accumulator_type')
results = self.test_sink.get_results()
expected = ['(a,15)', '(a,3)', '(a,6)', '(b,17)', '(b,3)']
self.assert_equals_sorted(expected, results)
def test_window_aggregate_process(self):
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[int, str]:
return 0, ''
def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
return value[1] + accumulator[0], value[0]
def get_result(self, accumulator: Tuple[str, int]):
return accumulator[1], accumulator[0]
def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
return acc_a[0] + acc_b[0], acc_a[1]
class MyProcessWindowFunction(ProcessWindowFunction):
def process(self, key: str, context: ProcessWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
agg_result = next(iter(elements))
yield "key {} timestamp sum {}".format(agg_result[0], agg_result[1])
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
window_function=MyProcessWindowFunction(),
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
output_type=Types.STRING()) \
.add_sink(self.test_sink)
self.env.execute('test_window_aggregate_process')
results = self.test_sink.get_results()
expected = ['key a timestamp sum 15',
'key a timestamp sum 3',
'key a timestamp sum 6',
'key b timestamp sum 17',
'key b timestamp sum 3']
self.assert_equals_sorted(expected, results)
def test_session_window_late_merge(self):
data_stream: DataStream = self.env.from_collection([
('hi', 0), ('hi', 8), ('hi', 4)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(5))) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_session_window_late_merge')
results = self.test_sink.get_results()
expected = ['(hi,0,13,3)']
self.assert_equals_sorted(expected, results)
def test_event_time_session_window_with_purging_trigger(self):
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 2), ('hi', 3), ('hi', 4), ('hi', 8), ('hi', 9), ('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(EventTimeSessionWindows.with_gap(Time.milliseconds(3))) \
.trigger(PurgingTrigger.of(EventTimeTrigger.create())) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_session_window_with_purging_trigger')
results = self.test_sink.get_results()
expected = ['(hi,1,7,4)', '(hi,8,12,2)', '(hi,15,18,1)']
self.assert_equals_sorted(expected, results)
def test_global_window_with_purging_trigger(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1), ('hi', 1)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyProcessFunction(ProcessWindowFunction):
def process(self, key, context: ProcessWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[tuple]:
return [(key, len([e for e in elements]))]
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda x: x[0], key_type=Types.STRING()) \
.window(GlobalWindows.create()) \
.trigger(PurgingTrigger.of(CountTrigger.of(2))) \
.process(MyProcessFunction(), Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_global_window_with_purging_trigger')
results = self.test_sink.get_results()
expected = ['(hi,2)', '(hi,2)', '(hi,2)']
self.assert_equals_sorted(expected, results)
def test_event_time_tumbling_window_all(self):
data_stream: DataStream = self.env.from_collection([
('hi', 1), ('hello', 2), ('hi', 3), ('hello', 4), ('hello', 5), ('hi', 8), ('hi', 9),
('hi', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.window_all(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
.process(CountAllWindowProcessFunction(),
Types.TUPLE([Types.LONG(), Types.LONG(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_event_time_tumbling_window_all')
results = self.test_sink.get_results()
expected = ['(0,5,4)', '(15,20,1)', '(5,10,3)']
self.assert_equals_sorted(expected, results)
def test_window_all_reduce(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.reduce(lambda a, b: (a[0], a[1] + b[1]),
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_window_all_reduce')
results = self.test_sink.get_results()
expected = ['(a,15)', '(a,6)', '(a,23)']
self.assert_equals_sorted(expected, results)
def test_window_all_reduce_process(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyProcessFunction(ProcessAllWindowFunction):
def process(self, context: 'ProcessAllWindowFunction.Context',
elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
yield "current window start at {}, reduce result {}".format(
context.window().start,
next(iter(elements)),
)
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.reduce(lambda a, b: (a[0], a[1] + b[1]),
window_function=MyProcessFunction(),
output_type=Types.STRING()) \
.add_sink(self.test_sink)
self.env.execute('test_window_all_reduce_process')
results = self.test_sink.get_results()
expected = ["current window start at 1, reduce result ('a', 6)",
"current window start at 6, reduce result ('a', 23)",
"current window start at 15, reduce result ('a', 15)"]
self.assert_equals_sorted(expected, results)
def test_window_all_aggregate(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[str, Dict[int, int]]:
return '', {0: 0, 1: 0}
def add(self, value: Tuple[str, int], accumulator: Tuple[str, Dict[int, int]]
) -> Tuple[str, Dict[int, int]]:
number_map = accumulator[1]
number_map[value[1] % 2] += 1
return value[0], number_map
def get_result(self, accumulator: Tuple[str, Dict[int, int]]) -> Tuple[str, int]:
number_map = accumulator[1]
return accumulator[0], number_map[0] - number_map[1]
def merge(self, acc_a: Tuple[str, Dict[int, int]], acc_b: Tuple[str, Dict[int, int]]
) -> Tuple[str, Dict[int, int]]:
number_map_a = acc_a[1]
number_map_b = acc_b[1]
new_number_map = {
0: number_map_a[0] + number_map_b[0],
1: number_map_a[1] + number_map_b[1]
}
return acc_a[0], new_number_map
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
output_type=Types.TUPLE([Types.STRING(), Types.INT()])) \
.add_sink(self.test_sink)
self.env.execute('test_window_all_aggregate')
results = self.test_sink.get_results()
expected = ['(a,-1)', '(b,-1)', '(b,1)']
self.assert_equals_sorted(expected, results)
def test_window_all_aggregate_process(self):
self.env.set_parallelism(1)
data_stream: DataStream = self.env.from_collection([
('a', 1), ('a', 2), ('b', 3), ('a', 6), ('b', 8), ('b', 9), ('a', 15)],
type_info=Types.TUPLE([Types.STRING(), Types.INT()]))
watermark_strategy = WatermarkStrategy.for_monotonous_timestamps() \
.with_timestamp_assigner(SecondColumnTimestampAssigner())
class MyAggregateFunction(AggregateFunction):
def create_accumulator(self) -> Tuple[int, str]:
return 0, ''
def add(self, value: Tuple[str, int], accumulator: Tuple[int, str]) -> Tuple[int, str]:
return value[1] + accumulator[0], value[0]
def get_result(self, accumulator: Tuple[str, int]):
return accumulator[1], accumulator[0]
def merge(self, acc_a: Tuple[int, str], acc_b: Tuple[int, str]):
return acc_a[0] + acc_b[0], acc_a[1]
class MyProcessWindowFunction(ProcessAllWindowFunction):
def process(self, context: ProcessAllWindowFunction.Context,
elements: Iterable[Tuple[str, int]]) -> Iterable[str]:
agg_result = next(iter(elements))
yield "key {} timestamp sum {}".format(agg_result[0], agg_result[1])
data_stream.assign_timestamps_and_watermarks(watermark_strategy) \
.window_all(EventTimeSessionWindows.with_gap(Time.milliseconds(2))) \
.aggregate(MyAggregateFunction(),
window_function=MyProcessWindowFunction(),
accumulator_type=Types.TUPLE([Types.INT(), Types.STRING()]),
output_type=Types.STRING()) \
.add_sink(self.test_sink)
self.env.execute('test_window_all_aggregate_process')
results = self.test_sink.get_results()
expected = ['key b timestamp sum 6',
'key b timestamp sum 23',
'key a timestamp sum 15']
self.assert_equals_sorted(expected, results)
def test_side_output_late_data(self):
self.env.set_parallelism(1)
config = Configuration(
j_configuration=get_j_env_configuration(self.env._j_stream_execution_environment)
)
config.set_integer('python.fn-execution.bundle.size', 1)
jvm = get_gateway().jvm
watermark_strategy = WatermarkStrategy(
jvm.org.apache.flink.api.common.eventtime.WatermarkStrategy.forGenerator(
jvm.org.apache.flink.streaming.api.functions.python.eventtime.
PerElementWatermarkGenerator.getSupplier()
)
).with_timestamp_assigner(SecondColumnTimestampAssigner())
tag = OutputTag('late-data', type_info=Types.ROW([Types.STRING(), Types.INT()]))
ds1 = self.env.from_collection([('a', 0), ('a', 8), ('a', 4), ('a', 6)],
type_info=Types.ROW([Types.STRING(), Types.INT()]))
ds2 = ds1.assign_timestamps_and_watermarks(watermark_strategy) \
.key_by(lambda e: e[0]) \
.window(TumblingEventTimeWindows.of(Time.milliseconds(5))) \
.allowed_lateness(0) \
.side_output_late_data(tag) \
.process(CountWindowProcessFunction(),
Types.TUPLE([Types.STRING(), Types.LONG(), Types.LONG(), Types.INT()]))
main_sink = DataStreamTestSinkFunction()
ds2.add_sink(main_sink)
side_sink = DataStreamTestSinkFunction()
ds2.get_side_output(tag).add_sink(side_sink)
self.env.execute('test_side_output_late_data')
main_expected = ['(a,0,5,1)', '(a,5,10,2)']
self.assert_equals_sorted(main_expected, main_sink.get_results())
side_expected = ['+I[a, 4]']
self.assert_equals_sorted(side_expected, side_sink.get_results())
class ProcessWindowTests(WindowTests, PyFlinkStreamingTestCase):
def setUp(self) -> None:
super(ProcessWindowTests, self).setUp()
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("python.execution-mode", "process")
class EmbeddedWindowTests(WindowTests, PyFlinkStreamingTestCase):
def setUp(self) -> None:
super(EmbeddedWindowTests, self).setUp()
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("python.execution-mode", "thread")
def test_chained_window(self):
class MyTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value: tuple, record_timestamp: int) -> int:
return value[0]
ds = self.env.from_collection(
[(1676461680000, "a1", "b1", 1), (1676461680000, "a1", "b1", 1),
(1676461680000, "a2", "b2", 1), (1676461680000, "a1", "b2", 1),
(1676461740000, "a1", "b1", 1), (1676461740000, "a2", "b2", 1)]
).assign_timestamps_and_watermarks(
WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(
MyTimestampAssigner())
)
ds.key_by(
lambda x: (x[0], x[1], x[2])
).window(
TumblingEventTimeWindows.of(Time.minutes(1))
).reduce(
lambda x, y: (x[0], x[1], x[2], x[3] + y[3]),
output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.STRING(), Types.INT()])
).map(
lambda x: (x[0], x[1], x[3]),
output_type=Types.TUPLE([Types.LONG(), Types.STRING(), Types.INT()])
).add_sink(self.test_sink)
self.env.execute('test_chained_window')
results = self.test_sink.get_results()
expected = ['(1676461680000,a1,1)',
'(1676461680000,a1,2)',
'(1676461680000,a2,1)',
'(1676461740000,a1,1)',
'(1676461740000,a2,1)']
self.assert_equals_sorted(expected, results)
class SecondColumnTimestampAssigner(TimestampAssigner):
def extract_timestamp(self, value, record_timestamp) -> int:
return int(value[1])
class MySessionWindowTimeGapExtractor(SessionWindowTimeGapExtractor):
def extract(self, element: tuple) -> int:
return element[1]
class SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):
def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):
result = 0
for i in inputs:
result += i[0]
return [(key, result)]
class CountWindowProcessFunction(ProcessWindowFunction[tuple, tuple, str, TimeWindow]):
def process(self,
key: str,
context: ProcessWindowFunction.Context[TimeWindow],
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(key, context.window().start, context.window().end, len([e for e in elements]))]
class CountAllWindowProcessFunction(ProcessAllWindowFunction[tuple, tuple, TimeWindow]):
def process(self,
context: 'ProcessAllWindowFunction.Context',
elements: Iterable[tuple]) -> Iterable[tuple]:
return [(context.window().start, context.window().end, len([e for e in elements]))]