| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import sys |
| from abc import ABC, abstractmethod |
| from typing import Generic, TypeVar, List, Iterable |
| |
| from apache_beam.coders import Coder, PickleCoder |
| |
| from pyflink.datastream.state import StateDescriptor, State, ValueStateDescriptor, \ |
| ListStateDescriptor, MapStateDescriptor |
| from pyflink.datastream.timerservice import InternalTimerService |
| from pyflink.datastream.window import TimeWindow, CountWindow |
| from pyflink.fn_execution.timerservice_impl import InternalTimerServiceImpl |
| from pyflink.fn_execution.internal_state import InternalMergingState |
| from pyflink.fn_execution.state_impl import RemoteKeyedStateBackend |
| |
| MAX_LONG_VALUE = sys.maxsize |
| |
| K = TypeVar('K') |
| W = TypeVar('W', TimeWindow, CountWindow) |
| |
| |
| class Context(Generic[K, W], ABC): |
| """ |
| Information available in an invocation of methods of InternalWindowProcessFunction. |
| """ |
| |
| @abstractmethod |
| def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: |
| """ |
| Creates a partitioned state handle, using the state backend configured for this task. |
| """ |
| pass |
| |
| @abstractmethod |
| def current_key(self) -> K: |
| """ |
| Returns current key of current processed element. |
| """ |
| pass |
| |
| @abstractmethod |
| def current_processing_time(self) -> int: |
| """ |
| Returns the current processing time. |
| """ |
| pass |
| |
| @abstractmethod |
| def current_watermark(self) -> int: |
| """ |
| Returns the current event-time watermark. |
| """ |
| pass |
| |
| @abstractmethod |
| def get_window_accumulators(self, window: W) -> List: |
| """ |
| Gets the accumulators of the given window. |
| """ |
| pass |
| |
| @abstractmethod |
| def set_window_accumulators(self, window: W, acc: List): |
| """ |
| Sets the accumulators of the given window. |
| """ |
| pass |
| |
| @abstractmethod |
| def clear_window_state(self, window: W): |
| """ |
| Clear window state of the given window. |
| """ |
| pass |
| |
| @abstractmethod |
| def clear_trigger(self, window: W): |
| """ |
| Call Trigger#clear(Window) on trigger. |
| """ |
| pass |
| |
| @abstractmethod |
| def on_merge(self, new_window: W, merged_windows: Iterable[W]): |
| """ |
| Call Trigger.on_merge() on trigger. |
| """ |
| pass |
| |
| @abstractmethod |
| def delete_cleanup_timer(self, window: W): |
| """ |
| Deletes the cleanup timer set for the contents of the provided window. |
| """ |
| pass |
| |
| |
| class WindowContext(Context[K, W]): |
| """ |
| Context of window. |
| """ |
| |
| def __init__(self, |
| window_operator, |
| trigger_context: 'TriggerContext', |
| state_backend: RemoteKeyedStateBackend, |
| state_value_coder: Coder, |
| timer_service: InternalTimerServiceImpl, |
| is_event_time: bool): |
| self._window_operator = window_operator |
| self._trigger_context = trigger_context |
| self._state_backend = state_backend |
| self.timer_service = timer_service |
| self.is_event_time = is_event_time |
| self.window_state = self._state_backend.get_value_state("window_state", state_value_coder) |
| |
| def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: |
| return self._trigger_context.get_partitioned_state(state_descriptor) |
| |
| def current_key(self) -> K: |
| return self._state_backend.get_current_key() |
| |
| def current_processing_time(self) -> int: |
| return self.timer_service.current_processing_time() |
| |
| def current_watermark(self) -> int: |
| return self.timer_service.current_watermark() |
| |
| def get_window_accumulators(self, window: W) -> List: |
| self.window_state.set_current_namespace(window) |
| return self.window_state.value() |
| |
| def set_window_accumulators(self, window: W, acc: List): |
| self.window_state.set_current_namespace(window) |
| self.window_state.update(acc) |
| |
| def clear_window_state(self, window: W): |
| self.window_state.set_current_namespace(window) |
| self.window_state.clear() |
| |
| def clear_trigger(self, window: W): |
| self._trigger_context.window = window |
| self._trigger_context.clear() |
| |
| def on_merge(self, new_window: W, merged_windows: Iterable[W]): |
| self._trigger_context.window = new_window |
| self._trigger_context.merged_windows = merged_windows |
| self._trigger_context.on_merge() |
| |
| def delete_cleanup_timer(self, window: W): |
| cleanup_time = self._window_operator.cleanup_time(window) |
| if cleanup_time == MAX_LONG_VALUE: |
| # no need to clean up because we didn't set one |
| return |
| if self.is_event_time: |
| self._trigger_context.delete_event_time_timer(cleanup_time) |
| else: |
| self._trigger_context.delete_processing_time_timer(cleanup_time) |
| |
| |
| class TriggerContext(object): |
| """ |
| TriggerContext is a utility for handling Trigger invocations. It can be reused by setting the |
| key and window fields. No internal state must be kept in the TriggerContext. |
| """ |
| |
| def __init__(self, |
| trigger, |
| timer_service: InternalTimerService[W], |
| state_backend: RemoteKeyedStateBackend): |
| self._trigger = trigger |
| self._timer_service = timer_service |
| self._state_backend = state_backend |
| self.window = None # type: W |
| self.merged_windows = None # type: Iterable[W] |
| |
| def open(self): |
| self._trigger.open(self) |
| |
| def on_element(self, row, timestamp: int) -> bool: |
| return self._trigger.on_element(row, timestamp, self.window) |
| |
| def on_processing_time(self, timestamp: int) -> bool: |
| return self._trigger.on_processing_time(timestamp, self.window) |
| |
| def on_event_time(self, timestamp: int) -> bool: |
| return self._trigger.on_event_time(timestamp, self.window) |
| |
| def on_merge(self): |
| self._trigger.on_merge(self.window, self) |
| |
| def get_current_processing_time(self) -> int: |
| return self._timer_service.current_processing_time() |
| |
| def get_current_watermark(self) -> int: |
| return self._timer_service.current_watermark() |
| |
| def register_processing_time_timer(self, time: int): |
| self._timer_service.register_processing_time_timer(self.window, time) |
| |
| def register_event_time_timer(self, time: int): |
| self._timer_service.register_event_time_timer(self.window, time) |
| |
| def delete_processing_time_timer(self, time: int): |
| self._timer_service.delete_processing_time_timer(self.window, time) |
| |
| def delete_event_time_timer(self, time: int): |
| self._timer_service.delete_event_time_timer(self.window, time) |
| |
| def clear(self): |
| self._trigger.clear(self.window) |
| |
| def get_partitioned_state(self, state_descriptor: StateDescriptor) -> State: |
| if isinstance(state_descriptor, ValueStateDescriptor): |
| state = self._state_backend.get_value_state(state_descriptor.name, PickleCoder()) |
| elif isinstance(state_descriptor, ListStateDescriptor): |
| state = self._state_backend.get_list_state(state_descriptor.name, PickleCoder()) |
| elif isinstance(state_descriptor, MapStateDescriptor): |
| state = self._state_backend.get_map_state( |
| state_descriptor.name, PickleCoder(), PickleCoder()) |
| else: |
| raise Exception("Unknown supported StateDescriptor %s" % state_descriptor) |
| state.set_current_namespace(self.window) |
| return state |
| |
| def merge_partitioned_state(self, state_descriptor: StateDescriptor): |
| if not self.merged_windows: |
| state = self.get_partitioned_state(state_descriptor) |
| if isinstance(state, InternalMergingState): |
| state.merge_namespaces(self.window, self.merged_windows) |
| else: |
| raise Exception("The given state descriptor does not refer to a mergeable state" |
| " (MergingState)") |