blob: afaed5a732d3db741c4a194ca6604f472619fbc8 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import abc
from pyflink.common import Row
from pyflink.common.serializer import VoidNamespaceSerializer
from pyflink.datastream import TimeDomain, RuntimeContext
from pyflink.fn_execution import pickle
from pyflink.fn_execution.datastream.process_function import \
InternalKeyedProcessFunctionOnTimerContext, InternalKeyedProcessFunctionContext, \
InternalProcessFunctionContext
from pyflink.fn_execution.datastream.runtime_context import StreamingRuntimeContext
from pyflink.fn_execution.datastream.window.window_operator import WindowOperator
from pyflink.fn_execution.datastream.timerservice_impl import (
TimerServiceImpl, InternalTimerServiceImpl, NonKeyedTimerServiceImpl)
from pyflink.fn_execution.datastream.input_handler import (RunnerInputHandler, TimerHandler,
_emit_results)
from pyflink.metrics.metricbase import GenericMetricGroup
DATA_STREAM_STATELESS_FUNCTION_URN = "flink:transform:ds:stateless_function:v1"
DATA_STREAM_STATEFUL_FUNCTION_URN = "flink:transform:ds:stateful_function:v1"
class Operation(abc.ABC):
def __init__(self, serialized_fn):
if serialized_fn.metric_enabled:
self.base_metric_group = GenericMetricGroup(None, None)
else:
self.base_metric_group = None
def finish(self):
self._update_gauge(self.base_metric_group)
def _update_gauge(self, base_metric_group):
if base_metric_group is not None:
for name in base_metric_group._flink_gauge:
flink_gauge = base_metric_group._flink_gauge[name]
beam_gauge = base_metric_group._beam_gauge[name]
beam_gauge.set(flink_gauge())
for sub_group in base_metric_group._sub_groups:
self._update_gauge(sub_group)
def process_element(self, value):
raise NotImplementedError
def open(self) -> None:
pass
def close(self) -> None:
pass
class StatelessOperation(Operation):
def __init__(self, serialized_fn):
super(StatelessOperation, self).__init__(serialized_fn)
self.open_func, self.close_func, self.process_element_func = \
extract_stateless_function(
user_defined_function_proto=serialized_fn,
runtime_context=StreamingRuntimeContext.of(
serialized_fn.runtime_context,
self.base_metric_group))
def open(self):
self.open_func()
def close(self):
self.close_func()
def process_element(self, value):
return self.process_element_func(value)
class StatefulOperation(Operation):
def __init__(self, serialized_fn, keyed_state_backend):
super(StatefulOperation, self).__init__(serialized_fn)
self.keyed_state_backend = keyed_state_backend
self.open_func, self.close_func, self.process_element_func, self.process_timer_func, \
self.internal_timer_service = \
extract_stateful_function(
user_defined_function_proto=serialized_fn,
runtime_context=StreamingRuntimeContext.of(
serialized_fn.runtime_context,
self.base_metric_group,
self.keyed_state_backend),
keyed_state_backend=self.keyed_state_backend)
def finish(self):
super().finish()
self.keyed_state_backend.commit()
def open(self):
self.open_func()
def close(self):
self.close_func()
def process_element(self, value):
return self.process_element_func(value)
def process_timer(self, timer_data):
return self.process_timer_func(timer_data)
def add_timer_info(self, timer_info):
self.internal_timer_service.add_timer_info(timer_info)
def extract_stateless_function(user_defined_function_proto, runtime_context: RuntimeContext):
"""
Extracts user-defined-function from the proto representation of a
:class:`Function`.
:param user_defined_function_proto: the proto representation of the Python :class:`Function`
:param runtime_context: the streaming runtime context
"""
from pyflink.fn_execution import flink_fn_execution_pb2
func_type = user_defined_function_proto.function_type
UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction
if func_type == UserDefinedDataStreamFunction.REVISE_OUTPUT:
def open_func():
pass
def close_func():
pass
def revise_output(value):
# VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA]
timestamp = value[0]
element = value[2]
yield Row(timestamp, element)
process_element_func = revise_output
else:
user_defined_func = pickle.loads(user_defined_function_proto.payload)
def open_func():
if hasattr(user_defined_func, "open"):
user_defined_func.open(runtime_context)
def close_func():
if hasattr(user_defined_func, "close"):
user_defined_func.close()
if func_type == UserDefinedDataStreamFunction.PROCESS:
process_element = user_defined_func.process_element
ctx = InternalProcessFunctionContext(NonKeyedTimerServiceImpl())
def wrapped_func(value):
# VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, NORMAL_DATA]
timestamp = value[0]
watermark = value[1]
ctx.set_timestamp(timestamp)
ctx.timer_service().advance_watermark(watermark)
results = process_element(value[2], ctx)
yield from _emit_results(timestamp, watermark, results)
process_element_func = wrapped_func
elif func_type == UserDefinedDataStreamFunction.CO_PROCESS:
process_element1 = user_defined_func.process_element1
process_element2 = user_defined_func.process_element2
ctx = InternalProcessFunctionContext(NonKeyedTimerServiceImpl())
def wrapped_func(value):
# VALUE[CURRENT_TIMESTAMP, CURRENT_WATERMARK, [isLeft, leftInput, rightInput]]
timestamp = value[0]
watermark = value[1]
ctx.set_timestamp(timestamp)
ctx.timer_service().advance_watermark(watermark)
normal_data = value[2]
if normal_data[0]:
results = process_element1(normal_data[1], ctx)
else:
results = process_element2(normal_data[2], ctx)
yield from _emit_results(timestamp, watermark, results)
process_element_func = wrapped_func
else:
raise Exception("Unsupported function_type: " + str(func_type))
return open_func, close_func, process_element_func
def extract_stateful_function(user_defined_function_proto,
runtime_context: RuntimeContext,
keyed_state_backend):
from pyflink.fn_execution import flink_fn_execution_pb2
func_type = user_defined_function_proto.function_type
user_defined_func = pickle.loads(user_defined_function_proto.payload)
internal_timer_service = InternalTimerServiceImpl(keyed_state_backend)
def state_key_selector(normal_data):
return Row(normal_data[0])
def user_key_selector(normal_data):
return normal_data[0]
def input_selector(normal_data):
return normal_data[1]
UserDefinedDataStreamFunction = flink_fn_execution_pb2.UserDefinedDataStreamFunction
if func_type in (UserDefinedDataStreamFunction.KEYED_PROCESS,
UserDefinedDataStreamFunction.KEYED_CO_PROCESS):
timer_service = TimerServiceImpl(internal_timer_service)
ctx = InternalKeyedProcessFunctionContext(timer_service)
on_timer_ctx = InternalKeyedProcessFunctionOnTimerContext(timer_service)
process_function = user_defined_func
internal_timer_service.set_namespace_serializer(VoidNamespaceSerializer())
def open_func():
if hasattr(process_function, "open"):
process_function.open(runtime_context)
def close_func():
if hasattr(process_function, "close"):
process_function.close()
def on_event_time(timestamp: int, key, namespace):
keyed_state_backend.set_current_key(key)
return _on_timer(TimeDomain.EVENT_TIME, timestamp, key)
def on_processing_time(timestamp: int, key, namespace):
keyed_state_backend.set_current_key(key)
return _on_timer(TimeDomain.PROCESSING_TIME, timestamp, key)
def _on_timer(time_domain: TimeDomain, timestamp: int, key):
user_current_key = user_key_selector(key)
on_timer_ctx.set_timestamp(timestamp)
on_timer_ctx.set_current_key(user_current_key)
on_timer_ctx.set_time_domain(time_domain)
return process_function.on_timer(timestamp, on_timer_ctx)
if func_type == UserDefinedDataStreamFunction.KEYED_PROCESS:
def process_element(normal_data, timestamp: int):
ctx.set_timestamp(timestamp)
ctx.set_current_key(user_key_selector(normal_data))
keyed_state_backend.set_current_key(state_key_selector(normal_data))
return process_function.process_element(input_selector(normal_data), ctx)
elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS:
def process_element(normal_data, timestamp: int):
is_left = normal_data[0]
if is_left:
user_input = normal_data[1]
else:
user_input = normal_data[2]
ctx.set_timestamp(timestamp)
on_timer_ctx.set_current_key(user_key_selector(user_input))
keyed_state_backend.set_current_key(state_key_selector(user_input))
if is_left:
return process_function.process_element1(input_selector(user_input), ctx)
else:
return process_function.process_element2(input_selector(user_input), ctx)
else:
raise Exception("Unsupported func_type: " + str(func_type))
elif func_type == UserDefinedDataStreamFunction.WINDOW:
window_operation_descriptor = user_defined_func
window_assigner = window_operation_descriptor.assigner
window_trigger = window_operation_descriptor.trigger
allowed_lateness = window_operation_descriptor.allowed_lateness
window_state_descriptor = window_operation_descriptor.window_state_descriptor
internal_window_function = window_operation_descriptor.internal_window_function
window_serializer = window_operation_descriptor.window_serializer
window_coder = window_serializer._get_coder()
keyed_state_backend.namespace_coder = window_coder
keyed_state_backend._namespace_coder_impl = window_coder.get_impl()
window_operator = WindowOperator(
window_assigner,
keyed_state_backend,
user_key_selector,
window_state_descriptor,
internal_window_function,
window_trigger,
allowed_lateness)
internal_timer_service.set_namespace_serializer(window_serializer)
def open_func():
window_operator.open(runtime_context, internal_timer_service)
def close_func():
window_operator.close()
def process_element(normal_data, timestamp: int):
keyed_state_backend.set_current_key(state_key_selector(normal_data))
return window_operator.process_element(input_selector(normal_data), timestamp)
def on_event_time(timestamp: int, key, namespace):
keyed_state_backend.set_current_key(key)
return window_operator.on_event_time(timestamp, key, namespace)
def on_processing_time(timestamp: int, key, namespace):
keyed_state_backend.set_current_key(key)
return window_operator.on_processing_time(timestamp, key, namespace)
else:
raise Exception("Unsupported function_type: " + str(func_type))
input_handler = RunnerInputHandler(
internal_timer_service,
process_element)
process_element_func = input_handler.process_element
timer_handler = TimerHandler(
internal_timer_service,
on_event_time,
on_processing_time,
keyed_state_backend._namespace_coder_impl)
process_timer_func = timer_handler.process_timer
return open_func, close_func, process_element_func, process_timer_func, internal_timer_service