blob: cff83a4d3ac221355669f17abb5c15d48e057a9b [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import collections
import time
from enum import Enum
from io import BytesIO
from pyflink.common import Row
from pyflink.datastream import TimerService
from pyflink.fn_execution.datastream.timerservice import InternalTimer, K, N, InternalTimerService
class TimerOperandType(Enum):
REGISTER_EVENT_TIMER = 0
REGISTER_PROC_TIMER = 1
DELETE_EVENT_TIMER = 2
DELETE_PROC_TIMER = 3
class LegacyInternalTimerServiceImpl(InternalTimerService[N]):
"""
Internal implementation of InternalTimerService.
TODO: Use InternalTimerServiceImpl instead.
"""
def __init__(self, keyed_state_backend):
self._keyed_state_backend = keyed_state_backend
self._current_watermark = None
self.timers = collections.OrderedDict()
def current_processing_time(self):
return int(time.time() * 1000)
def current_watermark(self):
return self._current_watermark
def advance_watermark(self, watermark: int):
self._current_watermark = watermark
def register_processing_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
timer = (TimerOperandType.REGISTER_PROC_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None
def register_event_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
timer = (TimerOperandType.REGISTER_EVENT_TIMER,
InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None
def delete_processing_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
timer = (TimerOperandType.DELETE_PROC_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None
def delete_event_time_timer(self, namespace: N, t: int):
current_key = self._keyed_state_backend.get_current_key()
timer = (TimerOperandType.DELETE_EVENT_TIMER, InternalTimerImpl(t, current_key, namespace))
self.timers[timer] = None
class InternalTimerServiceImpl(InternalTimerService[N]):
"""
Internal implementation of InternalTimerService.
"""
def __init__(self, keyed_state_backend):
self._keyed_state_backend = keyed_state_backend
self._current_watermark = None
self._timer_coder_impl = None
self._output_stream = None
from apache_beam.transforms.window import GlobalWindow
self._global_window = GlobalWindow()
def add_timer_info(self, timer_info):
self._timer_coder_impl = timer_info.timer_coder_impl
self._output_stream = timer_info.output_stream
def set_namespace_serializer(self, namespace_serializer):
self._namespace_serializer = namespace_serializer
def current_processing_time(self):
return int(time.time() * 1000)
def current_watermark(self):
return self._current_watermark
def advance_watermark(self, watermark: int):
self._current_watermark = watermark
def register_processing_time_timer(self, namespace: N, ts: int):
current_key = self._keyed_state_backend.get_current_key()
self._set_timer(TimerOperandType.REGISTER_PROC_TIMER, ts, current_key, namespace)
def register_event_time_timer(self, namespace: N, ts: int):
current_key = self._keyed_state_backend.get_current_key()
self._set_timer(TimerOperandType.REGISTER_EVENT_TIMER, ts, current_key, namespace)
def delete_processing_time_timer(self, namespace: N, ts: int):
current_key = self._keyed_state_backend.get_current_key()
self._set_timer(TimerOperandType.DELETE_PROC_TIMER, ts, current_key, namespace)
def delete_event_time_timer(self, namespace: N, ts: int):
current_key = self._keyed_state_backend.get_current_key()
self._set_timer(TimerOperandType.DELETE_EVENT_TIMER, ts, current_key, namespace)
def _set_timer(self, timer_operation_type, ts, key, namespace):
from apache_beam.transforms import userstate
bytes_io = BytesIO()
self._namespace_serializer.serialize(namespace, bytes_io)
encoded_namespace = bytes_io.getvalue()
timer_operand_type_value = timer_operation_type.value
timer_data = Row(timer_operand_type_value, -1, ts, key, encoded_namespace)
timer = userstate.Timer(
user_key=timer_data,
dynamic_timer_tag='',
windows=(self._global_window, ),
clear_bit=True,
fire_timestamp=None,
hold_timestamp=None,
paneinfo=None)
self._timer_coder_impl.encode_to_stream(timer, self._output_stream, True)
self._timer_coder_impl._key_coder_impl._value_coder._output_stream.maybe_flush()
class TimerServiceImpl(TimerService):
"""
Internal implementation of TimerService.
"""
def __init__(self, internal_timer_service: InternalTimerServiceImpl):
self._internal = internal_timer_service
def current_processing_time(self) -> int:
return self._internal.current_processing_time()
def current_watermark(self) -> int:
return self._internal.current_watermark()
def advance_watermark(self, wm: int):
self._internal.advance_watermark(wm)
def register_processing_time_timer(self, t: int):
self._internal.register_processing_time_timer(None, t)
def register_event_time_timer(self, t: int):
self._internal.register_event_time_timer(None, t)
def delete_processing_time_timer(self, t: int):
self._internal.delete_processing_time_timer(None, t)
def delete_event_time_timer(self, t: int):
self._internal.delete_event_time_timer(None, t)
class NonKeyedTimerServiceImpl(TimerService):
"""
Internal implementation of TimerService for ProcessFunction and CoProcessFunction.
"""
def __init__(self):
self._current_watermark = None
def current_processing_time(self) -> int:
return int(time.time() * 1000)
def current_watermark(self):
return self._current_watermark
def advance_watermark(self, wm):
self._current_watermark = wm
def register_processing_time_timer(self, t: int):
raise Exception("Register timers is only supported on a keyed stream.")
def register_event_time_timer(self, t: int):
raise Exception("Register timers is only supported on a keyed stream.")
def delete_processing_time_timer(self, t: int):
raise Exception("Deleting timers is only supported on a keyed streams.")
def delete_event_time_timer(self, t: int):
raise Exception("Deleting timers is only supported on a keyed streams.")
class InternalTimerImpl(InternalTimer[K, N]):
def __init__(self, timestamp: int, key: K, namespace: N):
self._timestamp = timestamp
self._key = key
self._namespace = namespace
def get_timestamp(self) -> int:
return self._timestamp
def get_key(self) -> K:
return self._key
def get_namespace(self) -> N:
return self._namespace
def __hash__(self):
result = int(self._timestamp ^ (self._timestamp >> 32))
result = 31 * result + hash(tuple(self._key))
result = 31 * result + hash(self._namespace)
return result
def __eq__(self, other):
return self.__class__ == other.__class__ and self._timestamp == other._timestamp \
and self._key == other._key and self._namespace == other._namespace