blob: 9c176147e585a758c8c3fa87911d5940232aedf8 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import math
from abc import ABC, abstractmethod
from typing import Generic, List, Any, Iterable, Set
from pyflink.common.typeinfo import Types
from pyflink.datastream.state import ValueStateDescriptor, ValueState
from pyflink.datastream.window import TimeWindow, CountWindow
from pyflink.fn_execution.table.window_context import Context, W
class WindowAssigner(Generic[W], ABC):
"""
A WindowAssigner assigns zero or more Windows to an element.
In a window operation, elements are grouped by their key (if available) and by the windows to
which it was assigned. The set of elements with the same key and window is called a pane. When a
Trigger decides that a certain pane should fire the window to produce output elements for
that pane.
"""
def open(self, ctx: Context[Any, W]):
"""
Initialization method for the function. It is called before the actual working methods.
"""
pass
@abstractmethod
def assign_windows(self, element: List, timestamp: int) -> Iterable[W]:
"""
Given the timestamp and element, returns the set of windows into which it should be placed.
:param element: The element to which windows should be assigned.
:param timestamp: The timestamp of the element when {@link #isEventTime()} returns true, or
the current system time when {@link #isEventTime()} returns false.
"""
pass
@abstractmethod
def is_event_time(self) -> bool:
"""
Returns True if elements are assigned to windows based on event time, False otherwise.
"""
pass
class PanedWindowAssigner(WindowAssigner[W], ABC):
"""
A WindowAssigner that window can be split into panes.
"""
@abstractmethod
def assign_pane(self, element, timestamp: int) -> W:
pass
@abstractmethod
def split_into_panes(self, window: W) -> Iterable[W]:
pass
@abstractmethod
def get_last_window(self, pane: W) -> W:
pass
class MergingWindowAssigner(WindowAssigner[W], ABC):
"""
A WindowAssigner that can merge windows.
"""
class MergeCallback:
"""
Callback to be used in merge_windows(W, List[W], MergeCallback) for
specifying which windows should be merged.
"""
@abstractmethod
def merge(self, merge_result: W, to_be_merged: Iterable[W]):
"""
Specifies that the given windows should be merged into the result window.
:param merge_result: The resulting merged window.
:param to_be_merged: The list of windows that should be merged into one window.
"""
pass
@abstractmethod
def merge_windows(self, new_window: W, sorted_windows: List[W], merge_callback: MergeCallback):
"""
Determines which windows (if any) should be merged.
:param new_window: The new window.
:param sorted_windows: The sorted window candidates.
:param merge_callback: A callback that can be invoked to signal which windows should be
merged.
"""
pass
class TumblingWindowAssigner(WindowAssigner[TimeWindow]):
"""
A WindowAssigner that windows elements into fixed-size windows based on the timestamp of
the elements. Windows cannot overlap.
"""
def __init__(self, size: int, offset: int, is_event_time: bool):
self._size = size
self._offset = offset
self._is_event_time = is_event_time
def assign_windows(self, element: List, timestamp: int) -> Iterable[TimeWindow]:
start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._size)
return [TimeWindow(start, start + self._size)]
def is_event_time(self) -> bool:
return self._is_event_time
def __repr__(self):
return "TumblingWindow(%s)" % self._size
class CountTumblingWindowAssigner(WindowAssigner[CountWindow]):
"""
A WindowAssigner that windows elements into fixed-size windows based on the count number
of the elements. Windows cannot overlap.
"""
def __init__(self, size: int):
self._size = size
self._count = None # type: ValueState
def open(self, ctx: Context[Any, CountWindow]):
value_state_descriptor = ValueStateDescriptor('tumble-count-assigner',
Types.PICKLED_BYTE_ARRAY())
self._count = ctx.get_partitioned_state(value_state_descriptor)
def assign_windows(self, element: List, timestamp: int) -> Iterable[CountWindow]:
count_value = self._count.value()
if count_value is None:
current_count = 0
else:
current_count = count_value
id = current_count / self._size
self._count.update(current_count + 1)
return [CountWindow(id)]
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "CountTumblingWindow(%s)" % self._size
class SlidingWindowAssigner(PanedWindowAssigner[TimeWindow]):
"""
A WindowAssigner that windows elements into sliding windows based on the timestamp of the
elements. Windows can possibly overlap.
"""
def __init__(self, size: int, slide: int, offset: int, is_event_time: bool):
self._size = size
self._slide = slide
self._offset = offset
self._is_event_time = is_event_time
self._pane_size = math.gcd(size, slide)
self._num_panes_per_window = size // self._pane_size
def assign_pane(self, element, timestamp: int) -> TimeWindow:
start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._pane_size)
return TimeWindow(start, start + self._pane_size)
def split_into_panes(self, window: W) -> Iterable[TimeWindow]:
start = window.start
for i in range(self._num_panes_per_window):
yield TimeWindow(start, start + self._pane_size)
start += self._pane_size
def get_last_window(self, pane: W) -> TimeWindow:
last_start = TimeWindow.get_window_start_with_offset(pane.start, self._offset, self._slide)
return TimeWindow(last_start, last_start + self._size)
def assign_windows(self, element: List, timestamp: int) -> Iterable[TimeWindow]:
last_start = TimeWindow.get_window_start_with_offset(timestamp, self._offset, self._slide)
windows = [TimeWindow(start, start + self._size)
for start in range(last_start, timestamp - self._size, -self._slide)]
return windows
def is_event_time(self) -> bool:
return self._is_event_time
def __repr__(self):
return "SlidingWindowAssigner(%s, %s)" % (self._size, self._slide)
class CountSlidingWindowAssigner(WindowAssigner[CountWindow]):
"""
A WindowAssigner that windows elements into sliding windows based on the count number of
the elements. Windows can possibly overlap.
"""
def __init__(self, size, slide):
self._size = size
self._slide = slide
self._count = None # type: ValueState
def open(self, ctx: Context[Any, CountWindow]):
count_descriptor = ValueStateDescriptor('slide-count-assigner', Types.PICKLED_BYTE_ARRAY())
self._count = ctx.get_partitioned_state(count_descriptor)
def assign_windows(self, element: List, timestamp: int) -> Iterable[W]:
count_value = self._count.value()
if count_value is None:
current_count = 0
else:
current_count = count_value
self._count.update(current_count + 1)
last_id = current_count // self._slide
last_start = last_id * self._slide
last_end = last_start + self._size - 1
windows = []
while last_id >= 0 and last_start <= current_count <= last_end:
if last_start <= current_count <= last_end:
windows.append(CountWindow(last_id))
last_id -= 1
last_start -= self._slide
last_end -= self._slide
return windows
def is_event_time(self) -> bool:
return False
def __repr__(self):
return "CountSlidingWindowAssigner(%s, %s)" % (self._size, self._slide)
class SessionWindowAssigner(MergingWindowAssigner[TimeWindow]):
"""
WindowAssigner that windows elements into sessions based on the timestamp. Windows cannot
overlap.
"""
def __init__(self, session_gap: int, is_event_time: bool):
self._session_gap = session_gap
self._is_event_time = is_event_time
def merge_windows(self, new_window: W, sorted_windows: List[TimeWindow],
merge_callback: MergingWindowAssigner.MergeCallback):
ceiling = self._ceiling_window(new_window, sorted_windows)
floor = self._floor_window(new_window, sorted_windows)
merge_result = new_window
merged_windows = set()
if ceiling:
merge_result = self._merge_window(merge_result, ceiling, merged_windows)
if floor:
merge_result = self._merge_window(merge_result, floor, merged_windows)
if merged_windows:
merged_windows.add(new_window)
merge_callback.merge(merge_result, merged_windows)
def assign_windows(self, element: List, timestamp: int) -> Iterable[TimeWindow]:
return [TimeWindow(timestamp, timestamp + self._session_gap)]
def is_event_time(self) -> bool:
return self._is_event_time
@staticmethod
def _ceiling_window(new_window: TimeWindow, sorted_windows: List[TimeWindow]):
if not sorted_windows:
return None
window_num = len(sorted_windows)
if sorted_windows[0] >= new_window:
return None
for i in range(window_num - 1):
if sorted_windows[i] <= new_window <= sorted_windows[i + 1]:
return sorted_windows[i]
return sorted_windows[window_num - 1]
@staticmethod
def _floor_window(new_window: TimeWindow, sorted_windows: List[TimeWindow]):
if not sorted_windows:
return None
window_num = len(sorted_windows)
if sorted_windows[window_num - 1] <= new_window:
return None
for i in range(window_num - 1):
if sorted_windows[i] <= new_window <= sorted_windows[i + 1]:
return sorted_windows[i + 1]
return sorted_windows[0]
# Merge curWindow and other, return a new window which covers curWindow and other if they are
# overlapped. Otherwise, returns the curWindow itself.
@staticmethod
def _merge_window(cur_window: TimeWindow, other: TimeWindow, merged_windows: Set[TimeWindow]):
if cur_window.intersects(other):
merged_windows.add(other)
return cur_window.cover(other)
else:
return cur_window
def __repr__(self):
return "SessionWindowAssigner(%s)" % self._session_gap