blob: d4a251db973c34523d2cfacc732612a08ddeebc9 [file] [log] [blame]
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
from pyflink.fn_execution.datastream.embedded.operations import extract_process_function
from pyflink.fn_execution.embedded.converters import DataConverter
class FunctionOperation(object):
def __init__(self,
operations,
output_data_converter: DataConverter):
self._operations = operations
self._output_data_converter = output_data_converter
self._main_operation = operations[0]
self._chained_operations = operations[1:]
def open(self):
for operation in self._operations:
operation.open()
def close(self):
for operation in self._operations:
operation.close()
def on_timer(self, timestamp):
results = self._main_operation.on_timer(timestamp)
if results:
results = self._process_elements(results)
yield from self._output_elements(results)
def _process_elements(self, elements):
def _process_elements_on_operation(op, items):
for item in items:
yield from op.process_element(item)
for operation in self._chained_operations:
elements = _process_elements_on_operation(operation, elements)
return elements
def _output_elements(self, elements):
for item in elements:
yield self._output_data_converter.to_external(item)
class OneInputFunctionOperation(FunctionOperation):
def __init__(self,
serialized_fns,
input_data_converter: DataConverter,
output_data_converter: DataConverter,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend):
operations = (
[extract_process_function(
serialized_fn,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
for serialized_fn in serialized_fns])
super(OneInputFunctionOperation, self).__init__(operations, output_data_converter)
self._input_data_converter = input_data_converter
def process_element(self, value):
results = self._main_operation.process_element(
self._input_data_converter.to_internal(value))
results = self._process_elements(results)
yield from self._output_elements(results)
class TwoInputFunctionOperation(FunctionOperation):
def __init__(self,
serialized_fns,
input_data_converter1: DataConverter,
input_data_converter2: DataConverter,
output_data_converter: DataConverter,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend):
operations = (
[extract_process_function(
serialized_fn,
runtime_context,
function_context,
timer_context,
side_output_context,
job_parameters,
keyed_state_backend,
operator_state_backend)
for serialized_fn in serialized_fns])
super(TwoInputFunctionOperation, self).__init__(operations, output_data_converter)
self._input_data_converter1 = input_data_converter1
self._input_data_converter2 = input_data_converter2
self._main_operation = operations[0]
self._other_operations = operations[1:]
def process_element1(self, value):
results = self._main_operation.process_element1(
self._input_data_converter1.to_internal(value))
results = self._process_elements(results)
yield from self._output_elements(results)
def process_element2(self, value):
results = self._main_operation.process_element2(
self._input_data_converter2.to_internal(value))
results = self._process_elements(results)
yield from self._output_elements(results)