| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| # cython: language_level = 3 |
| # cython: infer_types = True |
| # cython: profile=True |
| # cython: boundscheck=False, wraparound=False, initializedcheck=False, cdivision=True |
| from libc.stdint cimport * |
| |
| from apache_beam.coders.coder_impl cimport OutputStream as BOutputStream |
| from apache_beam.utils cimport windowed_value |
| from apache_beam.utils.windowed_value cimport WindowedValue |
| |
| from pyflink.fn_execution.coder_impl_fast cimport InputStreamWrapper |
| |
| from apache_beam.runners.worker.bundle_processor import DataOutputOperation |
| from pyflink.fn_execution.beam.beam_coder_impl_fast import FlinkLengthPrefixCoderBeamWrapper |
| from pyflink.fn_execution.table.operations import BundleOperation |
| from pyflink.fn_execution.profiler import Profiler |
| |
| |
| cdef class InputProcessor: |
| |
| cpdef has_next(self): |
| pass |
| |
| cpdef next(self): |
| pass |
| |
| |
| cdef class NetworkInputProcessor(InputProcessor): |
| |
| def __init__(self, InputStreamWrapper input_stream_wrapper): |
| self._input_stream_wrapper = input_stream_wrapper |
| |
| cpdef has_next(self): |
| return self._input_stream_wrapper.has_next() |
| |
| cpdef next(self): |
| return self._input_stream_wrapper.next() |
| |
| |
| cdef class IntermediateInputProcessor(InputProcessor): |
| |
| def __init__(self, input_values): |
| self._input_values = input_values |
| self._next_value = None |
| |
| cpdef has_next(self): |
| try: |
| self._next_value = next(self._input_values) |
| except StopIteration: |
| self._next_value = None |
| |
| return self._next_value is not None |
| |
| cpdef next(self): |
| return self._next_value |
| |
| |
| cdef class OutputProcessor: |
| |
| cpdef process_outputs(self, WindowedValue windowed_value, results): |
| pass |
| |
| cpdef close(self): |
| pass |
| |
| cdef class NetworkOutputProcessor(OutputProcessor): |
| |
| def __init__(self, consumer): |
| assert isinstance(consumer, DataOutputOperation) |
| self._consumer = consumer |
| self._value_coder_impl = consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder |
| |
| cpdef process_outputs(self, WindowedValue windowed_value, results): |
| output_stream = self._consumer.output_stream |
| self._value_coder_impl.encode_to_stream(results, output_stream, True) |
| self._value_coder_impl._output_stream.maybe_flush() |
| |
| cpdef close(self): |
| self._value_coder_impl._output_stream.close() |
| |
| cdef class IntermediateOutputProcessor(OutputProcessor): |
| |
| def __init__(self, consumer): |
| self._consumer = consumer |
| |
| cpdef process_outputs(self, WindowedValue windowed_value, results): |
| self._consumer.process(windowed_value.with_value(results)) |
| |
| |
| cdef class FunctionOperation(Operation): |
| """ |
| Base class of function operation that will execute StatelessFunction or StatefulFunction for |
| each input element. |
| """ |
| |
| def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls): |
| super(FunctionOperation, self).__init__(name, spec, counter_factory, sampler) |
| consumer = consumers['output'][0] |
| if isinstance(consumer, DataOutputOperation): |
| self._output_processor = NetworkOutputProcessor(consumer) |
| |
| _value_coder_impl = consumer.windowed_coder.wrapped_value_coder.get_impl()._value_coder |
| if isinstance(_value_coder_impl, FlinkLengthPrefixCoderBeamWrapper): |
| self._is_python_coder = False |
| else: |
| self._is_python_coder = True |
| else: |
| self._output_processor = IntermediateOutputProcessor(consumer) |
| self._is_python_coder = False |
| |
| self.operation_cls = operation_cls |
| self.operation = self.generate_operation() |
| self.process_element = self.operation.process_element |
| self.operation.open() |
| if spec.serialized_fn.profile_enabled: |
| self._profiler = Profiler() |
| else: |
| self._profiler = None |
| |
| cpdef start(self): |
| with self.scoped_start_state: |
| super(FunctionOperation, self).start() |
| if self._profiler: |
| self._profiler.start() |
| |
| cpdef finish(self): |
| with self.scoped_finish_state: |
| super(FunctionOperation, self).finish() |
| self.operation.finish() |
| if self._profiler: |
| self._profiler.close() |
| |
| cpdef teardown(self): |
| with self.scoped_finish_state: |
| self.operation.close() |
| self._output_processor.close() |
| |
| cpdef process(self, WindowedValue o): |
| cdef InputStreamWrapper input_stream_wrapper |
| cdef InputProcessor input_processor |
| with self.scoped_process_state: |
| if self._is_python_coder: |
| for value in o.value: |
| self._output_processor.process_outputs(o, self.process_element(value)) |
| else: |
| if isinstance(o.value, InputStreamWrapper): |
| input_processor = NetworkInputProcessor(o.value) |
| else: |
| input_processor = IntermediateInputProcessor(o.value) |
| if isinstance(self.operation, BundleOperation): |
| while input_processor.has_next(): |
| self.process_element(input_processor.next()) |
| self._output_processor.process_outputs(o, self.operation.finish_bundle()) |
| else: |
| while input_processor.has_next(): |
| result = self.process_element(input_processor.next()) |
| self._output_processor.process_outputs(o, result) |
| |
| def progress_metrics(self): |
| metrics = super(FunctionOperation, self).progress_metrics() |
| metrics.processed_elements.measured.output_element_counts.clear() |
| tag = None |
| receiver = self.receivers[0] |
| metrics.processed_elements.measured.output_element_counts[ |
| str(tag)] = receiver.opcounter.element_counter.value() |
| return metrics |
| |
| cpdef monitoring_infos(self, transform_id, tag_to_pcollection_id): |
| """ |
| Only pass user metric to Java |
| :param tag_to_pcollection_id: useless for user metric |
| """ |
| return self.user_monitoring_infos(transform_id) |
| |
| cdef object generate_operation(self): |
| pass |
| |
| |
| cdef class StatelessFunctionOperation(FunctionOperation): |
| def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls): |
| super(StatelessFunctionOperation, self).__init__( |
| name, spec, counter_factory, sampler, consumers, operation_cls) |
| |
| cdef object generate_operation(self): |
| return self.operation_cls(self.spec.serialized_fn) |
| |
| |
| cdef class StatefulFunctionOperation(FunctionOperation): |
| def __init__(self, name, spec, counter_factory, sampler, consumers, operation_cls, |
| keyed_state_backend): |
| self._keyed_state_backend = keyed_state_backend |
| self._reusable_windowed_value = windowed_value.create(None, -1, None, None) |
| super(StatefulFunctionOperation, self).__init__( |
| name, spec, counter_factory, sampler, consumers, operation_cls) |
| |
| cdef object generate_operation(self): |
| return self.operation_cls(self.spec.serialized_fn, self._keyed_state_backend) |
| |
| cpdef void add_timer_info(self, timer_family_id, timer_info): |
| # ignore timer_family_id |
| self.operation.add_timer_info(timer_info) |
| |
| cpdef process_timer(self, tag, timer_data): |
| cdef BOutputStream output_stream |
| self._output_processor.process_outputs( |
| self._reusable_windowed_value, |
| # the field user_key holds the timer data |
| self.operation.process_timer(timer_data.user_key)) |