blob: 47de5ed36af25a2a367748160d4ef5812fa0908d [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from abc import ABC
from pyflink.datastream import (ProcessFunction, KeyedProcessFunction, CoProcessFunction,
KeyedCoProcessFunction, TimerService, TimeDomain)
from pyflink.datastream.functions import (BaseBroadcastProcessFunction, BroadcastProcessFunction,
KeyedBroadcastProcessFunction)
from pyflink.datastream.state import MapStateDescriptor, BroadcastState, ReadOnlyBroadcastState
from pyflink.fn_execution.datastream.embedded.state_impl import (ReadOnlyBroadcastStateImpl,
BroadcastStateImpl)
from pyflink.fn_execution.datastream.embedded.timerservice_impl import TimerServiceImpl
from pyflink.fn_execution.embedded.converters import from_type_info_proto, from_type_info
from pyflink.fn_execution.embedded.java_utils import to_java_state_descriptor
class InternalProcessFunctionContext(ProcessFunction.Context, CoProcessFunction.Context,
TimerService):
def __init__(self, j_context):
self._j_context = j_context
def timer_service(self) -> TimerService:
return self
def timestamp(self) -> int:
return self._j_context.timestamp()
def current_processing_time(self):
return self._j_context.currentProcessingTime()
def current_watermark(self):
return self._j_context.currentWatermark()
def register_processing_time_timer(self, timestamp: int):
raise Exception("Register timers is only supported on a keyed stream.")
def register_event_time_timer(self, timestamp: int):
raise Exception("Register timers is only supported on a keyed stream.")
def delete_processing_time_timer(self, t: int):
raise Exception("Deleting timers is only supported on a keyed streams.")
def delete_event_time_timer(self, t: int):
raise Exception("Deleting timers is only supported on a keyed streams.")
class InternalKeyedProcessFunctionContext(KeyedProcessFunction.Context,
KeyedCoProcessFunction.Context):
def __init__(self, j_context, key_type_info):
self._j_context = j_context
self._timer_service = TimerServiceImpl(self._j_context.timerService())
self._key_converter = from_type_info_proto(key_type_info)
def get_current_key(self):
return self._key_converter.to_internal(self._j_context.getCurrentKey())
def timer_service(self) -> TimerService:
return self._timer_service
def timestamp(self) -> int:
return self._j_context.timestamp()
class InternalKeyedProcessFunctionOnTimerContext(KeyedProcessFunction.OnTimerContext,
KeyedProcessFunction.Context,
KeyedCoProcessFunction.OnTimerContext,
KeyedCoProcessFunction.Context):
def __init__(self, j_timer_context, key_type_info):
self._j_timer_context = j_timer_context
self._timer_service = TimerServiceImpl(self._j_timer_context.timerService())
self._key_converter = from_type_info_proto(key_type_info)
def timer_service(self) -> TimerService:
return self._timer_service
def timestamp(self) -> int:
return self._j_timer_context.timestamp()
def time_domain(self) -> TimeDomain:
return TimeDomain(self._j_timer_context.timeDomain())
def get_current_key(self):
return self._key_converter.to_internal(self._j_timer_context.getCurrentKey())
class InternalWindowTimerContext(object):
def __init__(self, j_timer_context, key_type_info, window_converter):
self._j_timer_context = j_timer_context
self._key_converter = from_type_info_proto(key_type_info)
self._window_converter = window_converter
def timestamp(self) -> int:
return self._j_timer_context.timestamp()
def window(self):
return self._window_converter.to_internal(self._j_timer_context.getWindow())
def get_current_key(self):
return self._key_converter.to_internal(self._j_timer_context.getCurrentKey())
class InternalBaseBroadcastProcessFunctionContext(BaseBroadcastProcessFunction.Context, ABC):
def __init__(self, j_context, j_operator_state_backend):
self._j_context = j_context
self._j_operator_state_backend = j_operator_state_backend
def timestamp(self) -> int:
return self._j_context.timestamp()
def current_processing_time(self) -> int:
return self._j_context.currentProcessingTime()
def current_watermark(self) -> int:
return self._j_context.currentWatermark()
class InternalBroadcastProcessFunctionContext(InternalBaseBroadcastProcessFunctionContext,
BroadcastProcessFunction.Context):
def __init__(self, j_context, j_operator_state_backend):
super(InternalBroadcastProcessFunctionContext, self).__init__(
j_context, j_operator_state_backend)
def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> BroadcastState:
return BroadcastStateImpl(
self._j_operator_state_backend.getBroadcastState(
to_java_state_descriptor(state_descriptor)),
from_type_info(state_descriptor.type_info))
class InternalBroadcastProcessFunctionReadOnlyContext(InternalBaseBroadcastProcessFunctionContext,
BroadcastProcessFunction.ReadOnlyContext):
def __init__(self, j_context, j_operator_state_backend):
super(InternalBroadcastProcessFunctionReadOnlyContext, self).__init__(
j_context, j_operator_state_backend)
def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> ReadOnlyBroadcastState:
return ReadOnlyBroadcastStateImpl(
self._j_operator_state_backend.getBroadcastState(
to_java_state_descriptor(state_descriptor)),
from_type_info(state_descriptor.type_info))
class InternalKeyedBroadcastProcessFunctionContext(InternalBaseBroadcastProcessFunctionContext,
KeyedBroadcastProcessFunction.Context):
def __init__(self, j_context, j_operator_state_backend):
super(InternalKeyedBroadcastProcessFunctionContext, self).__init__(
j_context, j_operator_state_backend)
def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> BroadcastState:
return BroadcastStateImpl(
self._j_operator_state_backend.getBroadcastState(
to_java_state_descriptor(state_descriptor)),
from_type_info(state_descriptor.type_info))
class InternalKeyedBroadcastProcessFunctionReadOnlyContext(
InternalBaseBroadcastProcessFunctionContext,
KeyedBroadcastProcessFunction.ReadOnlyContext
):
def __init__(self, j_context, key_type_info, j_operator_state_backend):
super(InternalKeyedBroadcastProcessFunctionReadOnlyContext, self).__init__(
j_context, j_operator_state_backend)
self._key_converter = from_type_info_proto(key_type_info)
self._timer_service = TimerServiceImpl(self._j_context.timerService())
def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> ReadOnlyBroadcastState:
return ReadOnlyBroadcastStateImpl(
self._j_operator_state_backend.getBroadcastState(
to_java_state_descriptor(state_descriptor)),
from_type_info(state_descriptor.type_info))
def timer_service(self) -> TimerService:
return self._timer_service
def get_current_key(self):
return self._key_converter.to_internal(self._j_context.getCurrentKey())
class InternalKeyedBroadcastProcessFunctionOnTimerContext(
InternalBaseBroadcastProcessFunctionContext,
KeyedBroadcastProcessFunction.OnTimerContext,
):
def __init__(self, j_timer_context, key_type_info, j_operator_state_backend):
super(InternalKeyedBroadcastProcessFunctionOnTimerContext, self).__init__(
j_timer_context, j_operator_state_backend)
self._timer_service = TimerServiceImpl(self._j_context.timerService())
self._key_converter = from_type_info_proto(key_type_info)
def get_broadcast_state(self, state_descriptor: MapStateDescriptor) -> ReadOnlyBroadcastState:
return ReadOnlyBroadcastStateImpl(
self._j_operator_state_backend.getBroadcastState(
to_java_state_descriptor(state_descriptor)),
from_type_info(state_descriptor.type_info))
def current_processing_time(self) -> int:
return self._timer_service.current_processing_time()
def current_watermark(self) -> int:
return self._timer_service.current_watermark()
def timer_service(self) -> TimerService:
return self._timer_service
def timestamp(self) -> int:
return self._j_context.timestamp()
def time_domain(self) -> TimeDomain:
return TimeDomain(self._j_context.timeDomain())
def get_current_key(self):
return self._key_converter.to_internal(self._j_context.getCurrentKey())