blob: 2f6218131f06e6db0c80e48c579049fc80f17100 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.fn_execution.embedded.operations import (OneInputFunctionOperation,
TwoInputFunctionOperation)
from pyflink.fn_execution.embedded.converters import from_type_info_proto, from_schema_proto
def pare_user_defined_data_stream_function_proto(proto):
from pyflink.fn_execution import flink_fn_execution_pb2
serialized_fn = flink_fn_execution_pb2.UserDefinedDataStreamFunction()
serialized_fn.ParseFromString(proto)
return serialized_fn
def parse_coder_proto(proto):
from pyflink.fn_execution import flink_fn_execution_pb2
coder = flink_fn_execution_pb2.CoderInfoDescriptor()
coder.ParseFromString(proto)
return coder
def parse_function_proto(proto):
from pyflink.fn_execution import flink_fn_execution_pb2
serialized_fn = flink_fn_execution_pb2.UserDefinedFunctions()
serialized_fn.ParseFromString(proto)
return serialized_fn
def create_scalar_operation_from_proto(proto,
input_coder_info,
output_coder_into,
one_arg_optimization=False,
one_result_optimization=False):
from pyflink.fn_execution.table.operations import ScalarFunctionOperation
serialized_fn = parse_function_proto(proto)
input_data_converter = (
from_schema_proto(
parse_coder_proto(input_coder_info).flatten_row_type.schema,
one_arg_optimization))
output_data_converter = (
from_schema_proto(
parse_coder_proto(output_coder_into).flatten_row_type.schema,
one_result_optimization))
scalar_operation = ScalarFunctionOperation(
serialized_fn, one_arg_optimization, one_result_optimization)
process_element_func = scalar_operation.process_element
def process_element(value):
actual_value = input_data_converter.to_internal(value)
result = process_element_func(actual_value)
return output_data_converter.to_external(result)
scalar_operation.process_element = process_element
return scalar_operation
def create_table_operation_from_proto(proto, input_coder_info, output_coder_into):
from pyflink.fn_execution.table.operations import TableFunctionOperation
serialized_fn = parse_function_proto(proto)
input_data_converter = (
from_schema_proto(parse_coder_proto(input_coder_info).flatten_row_type.schema))
output_data_converter = (
from_schema_proto(parse_coder_proto(output_coder_into).flatten_row_type.schema))
table_operation = TableFunctionOperation(serialized_fn)
process_element_func = table_operation.process_element
def process_element(value):
actual_value = input_data_converter.to_internal(value)
results = process_element_func(actual_value)
for result in results:
yield output_data_converter.to_external(result)
table_operation.process_element = process_element
return table_operation
def create_one_input_user_defined_data_stream_function_from_protos(
function_infos, input_coder_info, output_coder_info, runtime_context,
function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
operator_state_backend):
serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
for proto in function_infos]
input_data_converter = (
from_type_info_proto(parse_coder_proto(input_coder_info).raw_type.type_info))
output_data_converter = (
from_type_info_proto(parse_coder_proto(output_coder_info).raw_type.type_info))
function_operation = OneInputFunctionOperation(
serialized_fns,
input_data_converter,
output_data_converter,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
return function_operation
def create_two_input_user_defined_data_stream_function_from_protos(
function_infos, input_coder_info1, input_coder_info2, output_coder_info, runtime_context,
function_context, timer_context, side_output_context, job_parameters, keyed_state_backend,
operator_state_backend):
serialized_fns = [pare_user_defined_data_stream_function_proto(proto)
for proto in function_infos]
input_data_converter1 = (
from_type_info_proto(parse_coder_proto(input_coder_info1).raw_type.type_info))
input_data_converter2 = (
from_type_info_proto(parse_coder_proto(input_coder_info2).raw_type.type_info))
output_data_converter = (
from_type_info_proto(parse_coder_proto(output_coder_info).raw_type.type_info))
function_operation = TwoInputFunctionOperation(
serialized_fns,
input_data_converter1,
input_data_converter2,
output_data_converter,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
return function_operation