| ################################################################################ |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ################################################################################ |
| import typing |
| import uuid |
| from enum import Enum |
| from typing import Callable, Union, List, cast, Optional, overload |
| |
| from pyflink.util.java_utils import get_j_env_configuration |
| |
| from pyflink.common import typeinfo, ExecutionConfig, Row |
| from pyflink.common.typeinfo import RowTypeInfo, Types, TypeInformation, _from_java_type |
| from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner |
| from pyflink.datastream.connectors import Sink |
| from pyflink.datastream.functions import (_get_python_env, FlatMapFunction, MapFunction, Function, |
| FunctionWrapper, SinkFunction, FilterFunction, |
| KeySelector, ReduceFunction, CoMapFunction, |
| CoFlatMapFunction, Partitioner, RuntimeContext, |
| ProcessFunction, KeyedProcessFunction, |
| KeyedCoProcessFunction, WindowFunction, |
| ProcessWindowFunction, InternalWindowFunction, |
| InternalIterableWindowFunction, |
| InternalIterableProcessWindowFunction, CoProcessFunction, |
| InternalSingleValueWindowFunction, |
| InternalSingleValueProcessWindowFunction, |
| PassThroughWindowFunction, AggregateFunction, |
| NullByteKeySelector, AllWindowFunction, |
| InternalIterableAllWindowFunction, |
| ProcessAllWindowFunction, |
| InternalIterableProcessAllWindowFunction, |
| BroadcastProcessFunction, |
| KeyedBroadcastProcessFunction, |
| InternalSingleValueAllWindowFunction, |
| PassThroughAllWindowFunction, |
| InternalSingleValueProcessAllWindowFunction) |
| from pyflink.datastream.output_tag import OutputTag |
| from pyflink.datastream.slot_sharing_group import SlotSharingGroup |
| from pyflink.datastream.state import (ListStateDescriptor, StateDescriptor, ReducingStateDescriptor, |
| AggregatingStateDescriptor, MapStateDescriptor, ReducingState) |
| from pyflink.datastream.utils import convert_to_python_obj |
| from pyflink.datastream.window import (CountTumblingWindowAssigner, CountSlidingWindowAssigner, |
| CountWindowSerializer, TimeWindowSerializer, Trigger, |
| WindowAssigner, WindowOperationDescriptor, |
| GlobalWindowSerializer, MergingWindowAssigner) |
| from pyflink.java_gateway import get_gateway |
| from pyflink.util.java_utils import to_jarray |
| |
| __all__ = ['CloseableIterator', 'DataStream', 'KeyedStream', 'ConnectedStreams', 'WindowedStream', |
| 'DataStreamSink', 'CloseableIterator', 'BroadcastStream', 'BroadcastConnectedStream'] |
| |
| WINDOW_STATE_NAME = 'window-contents' |
| |
| |
| class DataStream(object): |
| """ |
| A DataStream represents a stream of elements of the same type. A DataStream can be transformed |
| into another DataStream by applying a transformation as for example: |
| |
| :: |
| >>> DataStream.map(MapFunctionImpl()) |
| >>> DataStream.filter(FilterFunctionImpl()) |
| """ |
| |
| def __init__(self, j_data_stream): |
| self._j_data_stream = j_data_stream |
| |
| def get_name(self) -> str: |
| """ |
| Gets the name of the current data stream. This name is used by the visualization and logging |
| during runtime. |
| |
| :return: Name of the stream. |
| """ |
| return self._j_data_stream.getName() |
| |
| def name(self, name: str) -> 'DataStream': |
| """ |
| Sets the name of the current data stream. This name is used by the visualization and logging |
| during runtime. |
| |
| :param name: Name of the stream. |
| :return: The named operator. |
| """ |
| self._j_data_stream.name(name) |
| return self |
| |
| def uid(self, uid: str) -> 'DataStream': |
| """ |
| Sets an ID for this operator. The specified ID is used to assign the same operator ID across |
| job submissions (for example when starting a job from a savepoint). |
| |
| Important: this ID needs to be unique per transformation and job. Otherwise, job submission |
| will fail. |
| |
| :param uid: The unique user-specified ID of this transformation. |
| :return: The operator with the specified ID. |
| """ |
| self._j_data_stream.uid(uid) |
| return self |
| |
| def set_uid_hash(self, uid_hash: str) -> 'DataStream': |
| """ |
| Sets an user provided hash for this operator. This will be used AS IS the create the |
| JobVertexID. The user provided hash is an alternative to the generated hashed, that is |
| considered when identifying an operator through the default hash mechanics fails (e.g. |
| because of changes between Flink versions). |
| |
| Important: this should be used as a workaround or for trouble shooting. The provided hash |
| needs to be unique per transformation and job. Otherwise, job submission will fail. |
| Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator |
| chain and trying so will let your job fail. |
| |
| A use case for this is in migration between Flink versions or changing the jobs in a way |
| that changes the automatically generated hashes. In this case, providing the previous hashes |
| directly through this method (e.g. obtained from old logs) can help to reestablish a lost |
| mapping from states to their target operator. |
| |
| :param uid_hash: The user provided hash for this operator. This will become the jobVertexID, |
| which is shown in the logs and web ui. |
| :return: The operator with the user provided hash. |
| """ |
| self._j_data_stream.setUidHash(uid_hash) |
| return self |
| |
| def set_parallelism(self, parallelism: int) -> 'DataStream': |
| """ |
| Sets the parallelism for this operator. |
| |
| :param parallelism: THe parallelism for this operator. |
| :return: The operator with set parallelism. |
| """ |
| self._j_data_stream.setParallelism(parallelism) |
| return self |
| |
| def set_max_parallelism(self, max_parallelism: int) -> 'DataStream': |
| """ |
| Sets the maximum parallelism of this operator. |
| |
| The maximum parallelism specifies the upper bound for dynamic scaling. It also defines the |
| number of key groups used for partitioned state. |
| |
| :param max_parallelism: Maximum parallelism. |
| :return: The operator with set maximum parallelism. |
| """ |
| self._j_data_stream.setMaxParallelism(max_parallelism) |
| return self |
| |
| def get_type(self) -> TypeInformation: |
| """ |
| Gets the type of the stream. |
| |
| :return: The type of the DataStream. |
| """ |
| return typeinfo._from_java_type(self._j_data_stream.getType()) |
| |
| def get_execution_environment(self): |
| """ |
| Returns the StreamExecutionEnvironment that was used to create this DataStream. |
| |
| :return: The Execution Environment. |
| """ |
| from pyflink.datastream import StreamExecutionEnvironment |
| return StreamExecutionEnvironment( |
| j_stream_execution_environment=self._j_data_stream.getExecutionEnvironment()) |
| |
| def get_execution_config(self) -> ExecutionConfig: |
| return ExecutionConfig(j_execution_config=self._j_data_stream.getExecutionConfig()) |
| |
| def force_non_parallel(self) -> 'DataStream': |
| """ |
| Sets the parallelism and maximum parallelism of this operator to one. And mark this operator |
| cannot set a non-1 degree of parallelism. |
| |
| :return: The operator with only one parallelism. |
| """ |
| self._j_data_stream.forceNonParallel() |
| return self |
| |
| def set_buffer_timeout(self, timeout_millis: int) -> 'DataStream': |
| """ |
| Sets the buffering timeout for data produced by this operation. The timeout defines how long |
| data may linger ina partially full buffer before being sent over the network. |
| |
| Lower timeouts lead to lower tail latencies, but may affect throughput. Timeouts of 1 ms |
| still sustain high throughput, even for jobs with high parallelism. |
| |
| A value of '-1' means that the default buffer timeout should be used. A value of '0' |
| indicates that no buffering should happen, and all records/events should be immediately sent |
| through the network, without additional buffering. |
| |
| :param timeout_millis: The maximum time between two output flushes. |
| :return: The operator with buffer timeout set. |
| """ |
| self._j_data_stream.setBufferTimeout(timeout_millis) |
| return self |
| |
| def start_new_chain(self) -> 'DataStream': |
| """ |
| Starts a new task chain beginning at this operator. This operator will be chained (thread |
| co-located for increased performance) to any previous tasks even if possible. |
| |
| :return: The operator with chaining set. |
| """ |
| self._j_data_stream.startNewChain() |
| return self |
| |
| def disable_chaining(self) -> 'DataStream': |
| """ |
| Turns off chaining for this operator so thread co-location will not be used as an |
| optimization. |
| Chaining can be turned off for the whole job by |
| StreamExecutionEnvironment.disableOperatorChaining() however it is not advised for |
| performance consideration. |
| |
| :return: The operator with chaining disabled. |
| """ |
| self._j_data_stream.disableChaining() |
| return self |
| |
| def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) -> 'DataStream': |
| """ |
| Sets the slot sharing group of this operation. Parallel instances of operations that are in |
| the same slot sharing group will be co-located in the same TaskManager slot, if possible. |
| |
| Operations inherit the slot sharing group of input operations if all input operations are in |
| the same slot sharing group and no slot sharing group was explicitly specified. |
| |
| Initially an operation is in the default slot sharing group. An operation can be put into |
| the default group explicitly by setting the slot sharing group to 'default'. |
| |
| :param slot_sharing_group: The slot sharing group name or which contains name and its |
| resource spec. |
| :return: This operator. |
| """ |
| if isinstance(slot_sharing_group, SlotSharingGroup): |
| self._j_data_stream.slotSharingGroup(slot_sharing_group.get_java_slot_sharing_group()) |
| else: |
| self._j_data_stream.slotSharingGroup(slot_sharing_group) |
| return self |
| |
| def set_description(self, description: str) -> 'DataStream': |
| """ |
| Sets the description for this operator. |
| |
| Description is used in json plan and web ui, but not in logging and metrics where only |
| name is available. Description is expected to provide detailed information about the |
| operator, while name is expected to be more simple, providing summary information only, |
| so that we can have more user-friendly logging messages and metric tags without losing |
| useful messages for debugging. |
| |
| :param description: The description for this operator. |
| :return: The operator with new description. |
| |
| .. versionadded:: 1.15.0 |
| """ |
| self._j_data_stream.setDescription(description) |
| return self |
| |
| def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \ |
| -> 'DataStream': |
| """ |
| Applies a Map transformation on a DataStream. The transformation calls a MapFunction for |
| each element of the DataStream. Each MapFunction call returns exactly one element. |
| |
| Note that If user does not specify the output data type, the output data will be serialized |
| as pickle primitive byte array. |
| |
| :param func: The MapFunction that is called for each element of the DataStream. |
| :param output_type: The type information of the MapFunction output data. |
| :return: The transformed DataStream. |
| """ |
| if not isinstance(func, MapFunction) and not callable(func): |
| raise TypeError("The input must be a MapFunction or a callable function") |
| |
| class MapProcessFunctionAdapter(ProcessFunction): |
| |
| def __init__(self, map_func): |
| if isinstance(map_func, MapFunction): |
| self._open_func = map_func.open |
| self._close_func = map_func.close |
| self._map_func = map_func.map |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._map_func = map_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| yield self._map_func(value) |
| |
| return self.process(MapProcessFunctionAdapter(func), output_type) \ |
| .name("Map") |
| |
| def flat_map(self, |
| func: Union[Callable, FlatMapFunction], |
| output_type: TypeInformation = None) -> 'DataStream': |
| """ |
| Applies a FlatMap transformation on a DataStream. The transformation calls a FlatMapFunction |
| for each element of the DataStream. Each FlatMapFunction call can return any number of |
| elements including none. |
| |
| :param func: The FlatMapFunction that is called for each element of the DataStream. |
| :param output_type: The type information of output data. |
| :return: The transformed DataStream. |
| """ |
| |
| if not isinstance(func, FlatMapFunction) and not callable(func): |
| raise TypeError("The input must be a FlatMapFunction or a callable function") |
| |
| class FlatMapProcessFunctionAdapter(ProcessFunction): |
| |
| def __init__(self, flat_map_func): |
| if isinstance(flat_map_func, FlatMapFunction): |
| self._open_func = flat_map_func.open |
| self._close_func = flat_map_func.close |
| self._flat_map_func = flat_map_func.flat_map |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._flat_map_func = flat_map_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| yield from self._flat_map_func(value) |
| |
| return self.process(FlatMapProcessFunctionAdapter(func), output_type) \ |
| .name("FlatMap") |
| |
| def key_by(self, |
| key_selector: Union[Callable, KeySelector], |
| key_type: TypeInformation = None) -> 'KeyedStream': |
| """ |
| Creates a new KeyedStream that uses the provided key for partitioning its operator states. |
| |
| :param key_selector: The KeySelector to be used for extracting the key for partitioning. |
| :param key_type: The type information describing the key type. |
| :return: The DataStream with partitioned state(i.e. KeyedStream). |
| """ |
| |
| if not isinstance(key_selector, KeySelector) and not callable(key_selector): |
| raise TypeError("Parameter key_selector should be type of KeySelector or a callable " |
| "function.") |
| |
| class AddKey(ProcessFunction): |
| |
| def __init__(self, key_selector): |
| if isinstance(key_selector, KeySelector): |
| self._key_selector_open_func = key_selector.open |
| self._key_selector_close_func = key_selector.close |
| self._get_key_func = key_selector.get_key |
| else: |
| self._key_selector_open_func = None |
| self._key_selector_close_func = None |
| self._get_key_func = key_selector |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._key_selector_open_func: |
| self._key_selector_open_func(runtime_context) |
| |
| def close(self): |
| if self._key_selector_close_func: |
| self._key_selector_close_func() |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| yield Row(self._get_key_func(value), value) |
| |
| output_type_info = typeinfo._from_java_type( |
| self._j_data_stream.getTransformation().getOutputType()) |
| if key_type is None: |
| key_type = Types.PICKLED_BYTE_ARRAY() |
| |
| gateway = get_gateway() |
| stream_with_key_info = self.process( |
| AddKey(key_selector), |
| output_type=Types.ROW([key_type, output_type_info])) |
| stream_with_key_info.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil |
| .STREAM_KEY_BY_MAP_OPERATOR_NAME) |
| |
| JKeyByKeySelector = gateway.jvm.KeyByKeySelector |
| key_stream = KeyedStream( |
| stream_with_key_info._j_data_stream.keyBy( |
| JKeyByKeySelector(), |
| Types.ROW([key_type]).get_java_type_info()), output_type_info, |
| self) |
| return key_stream |
| |
| def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream': |
| """ |
| Applies a Filter transformation on a DataStream. The transformation calls a FilterFunction |
| for each element of the DataStream and retains only those element for which the function |
| returns true. Elements for which the function returns false are filtered. |
| |
| :param func: The FilterFunction that is called for each element of the DataStream. |
| :return: The filtered DataStream. |
| """ |
| |
| if not isinstance(func, FilterFunction) and not callable(func): |
| raise TypeError("The input must be a FilterFunction or a callable function") |
| |
| class FilterProcessFunctionAdapter(ProcessFunction): |
| |
| def __init__(self, filter_func): |
| if isinstance(filter_func, FilterFunction): |
| self._open_func = filter_func.open |
| self._close_func = filter_func.close |
| self._filter_func = filter_func.filter |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._filter_func = filter_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| if self._filter_func(value): |
| yield value |
| |
| output_type = typeinfo._from_java_type( |
| self._j_data_stream.getTransformation().getOutputType()) |
| return self.process(FilterProcessFunctionAdapter(func), output_type=output_type) \ |
| .name("Filter") |
| |
| def window_all(self, window_assigner: WindowAssigner) -> 'AllWindowedStream': |
| """ |
| Windows this data stream to a AllWindowedStream, which evaluates windows over a non key |
| grouped stream. Elements are put into windows by a WindowAssigner. The grouping of |
| elements is done by window. |
| |
| A Trigger can be defined to specify when windows are evaluated. However, WindowAssigners |
| have a default Trigger that is used if a Trigger is not specified. |
| |
| :param window_assigner: The WindowAssigner that assigns elements to windows. |
| :return: The trigger windows data stream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return AllWindowedStream(self, window_assigner) |
| |
| def union(self, *streams: 'DataStream') -> 'DataStream': |
| """ |
| Creates a new DataStream by merging DataStream outputs of the same type with each other. The |
| DataStreams merged using this operator will be transformed simultaneously. |
| |
| :param streams: The DataStream to union outputwith. |
| :return: The DataStream. |
| """ |
| j_data_streams = [] |
| for data_stream in streams: |
| if isinstance(data_stream, KeyedStream): |
| j_data_streams.append(data_stream._values()._j_data_stream) |
| else: |
| j_data_streams.append(data_stream._j_data_stream) |
| gateway = get_gateway() |
| JDataStream = gateway.jvm.org.apache.flink.streaming.api.datastream.DataStream |
| j_data_stream_arr = get_gateway().new_array(JDataStream, len(j_data_streams)) |
| for i in range(len(j_data_streams)): |
| j_data_stream_arr[i] = j_data_streams[i] |
| j_united_stream = self._j_data_stream.union(j_data_stream_arr) |
| return DataStream(j_data_stream=j_united_stream) |
| |
| @overload |
| def connect(self, ds: 'DataStream') -> 'ConnectedStreams': |
| pass |
| |
| @overload |
| def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream': |
| pass |
| |
| def connect(self, ds: Union['DataStream', 'BroadcastStream']) \ |
| -> Union['ConnectedStreams', 'BroadcastConnectedStream']: |
| """ |
| If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting |
| DataStream outputs of (possible) different types with each other. The DataStreams connected |
| using this operator can be used with CoFunctions to apply joint transformations. |
| |
| If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by |
| connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can |
| be created using the :meth:`broadcast` method. The resulting stream can be further processed |
| using the :meth:`BroadcastConnectedStream.process` method. |
| |
| :param ds: The DataStream or BroadcastStream with which this stream will be connected. |
| :return: The ConnectedStreams or BroadcastConnectedStream. |
| |
| .. versionchanged:: 1.16.0 |
| Support connect BroadcastStream |
| """ |
| if isinstance(ds, BroadcastStream): |
| return BroadcastConnectedStream( |
| self, ds, cast(BroadcastStream, ds).broadcast_state_descriptors |
| ) |
| return ConnectedStreams(self, ds) |
| |
| def shuffle(self) -> 'DataStream': |
| """ |
| Sets the partitioning of the DataStream so that the output elements are shuffled uniformly |
| randomly to the next operation. |
| |
| :return: The DataStream with shuffle partitioning set. |
| """ |
| return DataStream(self._j_data_stream.shuffle()) |
| |
| def project(self, *field_indexes: int) -> 'DataStream': |
| """ |
| Initiates a Project transformation on a Tuple DataStream. |
| |
| Note that only Tuple DataStreams can be projected. |
| |
| :param field_indexes: The field indexes of the input tuples that are retained. The order of |
| fields in the output tuple corresponds to the order of field indexes. |
| :return: The projected DataStream. |
| """ |
| if not isinstance(self.get_type(), typeinfo.TupleTypeInfo): |
| raise Exception('Only Tuple DataStreams can be projected.') |
| |
| gateway = get_gateway() |
| j_index_arr = gateway.new_array(gateway.jvm.int, len(field_indexes)) |
| for i in range(len(field_indexes)): |
| j_index_arr[i] = field_indexes[i] |
| return DataStream(self._j_data_stream.project(j_index_arr)) |
| |
| def rescale(self) -> 'DataStream': |
| """ |
| Sets the partitioning of the DataStream so that the output elements are distributed evenly |
| to a subset of instances of the next operation in a round-robin fashion. |
| |
| The subset of downstream operations to which the upstream operation sends elements depends |
| on the degree of parallelism of both the upstream and downstream operation. For example, if |
| the upstream operation has parallelism 2 and the downstream operation has parallelism 4, |
| then one upstream operation would distribute elements to two downstream operations. If, on |
| the other hand, the downstream operation has parallelism 4 then two upstream operations will |
| distribute to one downstream operation while the other two upstream operations will |
| distribute to the other downstream operations. |
| |
| In cases where the different parallelisms are not multiples of each one or several |
| downstream operations will have a differing number of inputs from upstream operations. |
| |
| :return: The DataStream with rescale partitioning set. |
| """ |
| return DataStream(self._j_data_stream.rescale()) |
| |
| def rebalance(self) -> 'DataStream': |
| """ |
| Sets the partitioning of the DataStream so that the output elements are distributed evenly |
| to instances of the next operation in a round-robin fashion. |
| |
| :return: The DataStream with rebalance partition set. |
| """ |
| return DataStream(self._j_data_stream.rebalance()) |
| |
| def forward(self) -> 'DataStream': |
| """ |
| Sets the partitioning of the DataStream so that the output elements are forwarded to the |
| local sub-task of the next operation. |
| |
| :return: The DataStream with forward partitioning set. |
| """ |
| return DataStream(self._j_data_stream.forward()) |
| |
| @overload |
| def broadcast(self) -> 'DataStream': |
| pass |
| |
| @overload |
| def broadcast(self, broadcast_state_descriptor: MapStateDescriptor, |
| *other_broadcast_state_descriptors: MapStateDescriptor) -> 'BroadcastStream': |
| pass |
| |
| def broadcast(self, broadcast_state_descriptor: Optional[MapStateDescriptor] = None, |
| *other_broadcast_state_descriptors: MapStateDescriptor) \ |
| -> Union['DataStream', 'BroadcastStream']: |
| """ |
| Sets the partitioning of the DataStream so that the output elements are broadcasted to every |
| parallel instance of the next operation. |
| |
| If :class:`~state.MapStateDescriptor` s are passed in, it returns a |
| :class:`BroadcastStream` with :class:`~state.BroadcastState` s implicitly created as the |
| descriptors specified. |
| |
| Example: |
| :: |
| |
| >>> map_state_desc1 = MapStateDescriptor("state1", Types.INT(), Types.INT()) |
| >>> map_state_desc2 = MapStateDescriptor("state2", Types.INT(), Types.STRING()) |
| >>> broadcast_stream = ds1.broadcast(map_state_desc1, map_state_desc2) |
| >>> broadcast_connected_stream = ds2.connect(broadcast_stream) |
| |
| :param broadcast_state_descriptor: the first MapStateDescriptor describing BroadcastState. |
| :param other_broadcast_state_descriptors: the rest of MapStateDescriptors describing |
| BroadcastStates, if any. |
| :return: The DataStream with broadcast partitioning set or a BroadcastStream which can be |
| used in :meth:`connect` to create a BroadcastConnectedStream for further processing of |
| the elements. |
| |
| .. versionchanged:: 1.16.0 |
| Support return BroadcastStream |
| """ |
| if broadcast_state_descriptor is not None: |
| args = [broadcast_state_descriptor] |
| args.extend(other_broadcast_state_descriptors) |
| for arg in args: |
| if not isinstance(arg, MapStateDescriptor): |
| raise TypeError("broadcast_state_descriptor must be MapStateDescriptor") |
| broadcast_state_descriptors = [arg for arg in args] # type: List[MapStateDescriptor] |
| return BroadcastStream(cast(DataStream, self.broadcast()), broadcast_state_descriptors) |
| return DataStream(self._j_data_stream.broadcast()) |
| |
| def process(self, func: ProcessFunction, output_type: TypeInformation = None) -> 'DataStream': |
| """ |
| Applies the given ProcessFunction on the input stream, thereby creating a transformed output |
| stream. |
| |
| The function will be called for every element in the input streams and can produce zero or |
| more output elements. |
| |
| :param func: The ProcessFunction that is called for each element in the stream. |
| :param output_type: TypeInformation for the result type of the function. |
| :return: The transformed DataStream. |
| """ |
| |
| from pyflink.fn_execution import flink_fn_execution_pb2 |
| j_python_data_stream_function_operator, j_output_type_info = \ |
| _get_one_input_stream_operator( |
| self, |
| func, |
| flink_fn_execution_pb2.UserDefinedDataStreamFunction.PROCESS, # type: ignore |
| output_type) |
| return DataStream(self._j_data_stream.transform( |
| "PROCESS", |
| j_output_type_info, |
| j_python_data_stream_function_operator)) |
| |
| def assign_timestamps_and_watermarks(self, watermark_strategy: WatermarkStrategy) -> \ |
| 'DataStream': |
| """ |
| Assigns timestamps to the elements in the data stream and generates watermarks to signal |
| event time progress. The given {@link WatermarkStrategy} is used to create a |
| TimestampAssigner and WatermarkGenerator. |
| |
| :param watermark_strategy: The strategy to generate watermarks based on event timestamps. |
| :return: The stream after the transformation, with assigned timestamps and watermarks. |
| """ |
| if watermark_strategy._timestamp_assigner is not None: |
| # in case users have specified custom TimestampAssigner, we need to extract and |
| # generate watermark according to the specified TimestampAssigner. |
| |
| class TimestampAssignerProcessFunctionAdapter(ProcessFunction): |
| |
| def __init__(self, timestamp_assigner: TimestampAssigner): |
| self._extract_timestamp_func = timestamp_assigner.extract_timestamp |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| yield value, self._extract_timestamp_func(value, ctx.timestamp()) |
| |
| # step 1: extract the timestamp according to the specified TimestampAssigner |
| timestamped_data_stream = self.process( |
| TimestampAssignerProcessFunctionAdapter(watermark_strategy._timestamp_assigner), |
| Types.TUPLE([self.get_type(), Types.LONG()])) |
| timestamped_data_stream.name("Extract-Timestamp") |
| |
| # step 2: assign timestamp and watermark |
| gateway = get_gateway() |
| JCustomTimestampAssigner = gateway.jvm.org.apache.flink.streaming.api.functions.python \ |
| .eventtime.CustomTimestampAssigner |
| j_watermarked_data_stream = ( |
| timestamped_data_stream._j_data_stream.assignTimestampsAndWatermarks( |
| watermark_strategy._j_watermark_strategy.withTimestampAssigner( |
| JCustomTimestampAssigner()))) |
| |
| # step 3: remove the timestamp field which is added in step 1 |
| JRemoveTimestampMapFunction = gateway.jvm.org.apache.flink.streaming.api.functions \ |
| .python.eventtime.RemoveTimestampMapFunction |
| result = DataStream(j_watermarked_data_stream.map( |
| JRemoveTimestampMapFunction(), self._j_data_stream.getType())) |
| result.name("Remove-Timestamp") |
| return result |
| else: |
| # if user not specify a TimestampAssigner, then return directly assign the Java |
| # watermark strategy. |
| return DataStream(self._j_data_stream.assignTimestampsAndWatermarks( |
| watermark_strategy._j_watermark_strategy)) |
| |
| def partition_custom(self, partitioner: Union[Callable, Partitioner], |
| key_selector: Union[Callable, KeySelector]) -> 'DataStream': |
| """ |
| Partitions a DataStream on the key returned by the selector, using a custom partitioner. |
| This method takes the key selector to get the key to partition on, and a partitioner that |
| accepts the key type. |
| |
| Note that this method works only on single field keys, i.e. the selector cannot return |
| tuples of fields. |
| |
| :param partitioner: The partitioner to assign partitions to keys. |
| :param key_selector: The KeySelector with which the DataStream is partitioned. |
| :return: The partitioned DataStream. |
| """ |
| |
| if not isinstance(partitioner, Partitioner) and not callable(partitioner): |
| raise TypeError("Parameter partitioner should be type of Partitioner or a callable " |
| "function.") |
| |
| if not isinstance(key_selector, KeySelector) and not callable(key_selector): |
| raise TypeError("Parameter key_selector should be type of KeySelector or a callable " |
| "function.") |
| |
| gateway = get_gateway() |
| |
| class CustomPartitioner(ProcessFunction): |
| """ |
| A wrapper class for partition_custom map function. It indicates that it is a partition |
| custom operation that we need to apply PythonPartitionCustomOperator |
| to run the map function. |
| """ |
| |
| def __init__(self, partitioner, key_selector): |
| if isinstance(partitioner, Partitioner): |
| self._partitioner_open_func = partitioner.open |
| self._partitioner_close_func = partitioner.close |
| self._partition_func = partitioner.partition |
| else: |
| self._partitioner_open_func = None |
| self._partitioner_close_func = None |
| self._partition_func = partitioner |
| |
| if isinstance(key_selector, KeySelector): |
| self._key_selector_open_func = key_selector.open |
| self._key_selector_close_func = key_selector.close |
| self._get_key_func = key_selector.get_key |
| else: |
| self._key_selector_open_func = None |
| self._key_selector_close_func = None |
| self._get_key_func = key_selector |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._partitioner_open_func: |
| self._partitioner_open_func(runtime_context) |
| if self._key_selector_open_func: |
| self._key_selector_open_func(runtime_context) |
| |
| self.num_partitions = int(runtime_context.get_job_parameter( |
| "NUM_PARTITIONS", "-1")) |
| if self.num_partitions <= 0: |
| raise ValueError( |
| "The partition number should be a positive value, got %s" |
| % self.num_partitions) |
| |
| def close(self): |
| if self._partitioner_close_func: |
| self._partitioner_close_func() |
| if self._key_selector_close_func: |
| self._key_selector_close_func() |
| |
| def process_element(self, value, ctx: 'ProcessFunction.Context'): |
| partition = self._partition_func(self._get_key_func(value), self.num_partitions) |
| yield Row(partition, value) |
| |
| original_type_info = self.get_type() |
| stream_with_partition_info = self.process( |
| CustomPartitioner(partitioner, key_selector), |
| output_type=Types.ROW([Types.INT(), original_type_info])) |
| |
| stream_with_partition_info.name( |
| gateway.jvm.org.apache.flink.python.util.PythonConfigUtil |
| .STREAM_PARTITION_CUSTOM_MAP_OPERATOR_NAME) |
| |
| JPartitionCustomKeySelector = gateway.jvm.PartitionCustomKeySelector |
| JIdParitioner = gateway.jvm.org.apache.flink.api.java.functions.IdPartitioner |
| partitioned_stream_with_partition_info = DataStream( |
| stream_with_partition_info._j_data_stream.partitionCustom( |
| JIdParitioner(), JPartitionCustomKeySelector())) |
| |
| partitioned_stream = partitioned_stream_with_partition_info.map( |
| lambda x: x[1], original_type_info) |
| partitioned_stream.name(gateway.jvm.org.apache.flink.python.util.PythonConfigUtil |
| .KEYED_STREAM_VALUE_OPERATOR_NAME) |
| return DataStream(partitioned_stream._j_data_stream) |
| |
| def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink': |
| """ |
| Adds the given sink to this DataStream. Only streams with sinks added will be executed once |
| the StreamExecutionEnvironment.execute() method is called. |
| |
| :param sink_func: The SinkFunction object. |
| :return: The closed DataStream. |
| """ |
| return DataStreamSink(self._j_data_stream.addSink(sink_func.get_java_function())) |
| |
| def sink_to(self, sink: Sink) -> 'DataStreamSink': |
| """ |
| Adds the given sink to this DataStream. Only streams with sinks added will be |
| executed once the |
| :func:`~pyflink.datastream.stream_execution_environment.StreamExecutionEnvironment.execute` |
| method is called. |
| |
| :param sink: The user defined sink. |
| :return: The closed DataStream. |
| """ |
| ds = self |
| |
| from pyflink.datastream.connectors.base import SupportsPreprocessing |
| if isinstance(sink, SupportsPreprocessing) and sink.get_transformer() is not None: |
| ds = sink.get_transformer().apply(self) |
| |
| return DataStreamSink(ds._j_data_stream.sinkTo(sink.get_java_function())) |
| |
| def execute_and_collect(self, job_execution_name: str = None, limit: int = None) \ |
| -> Union['CloseableIterator', list]: |
| """ |
| Triggers the distributed execution of the streaming dataflow and returns an iterator over |
| the elements of the given DataStream. |
| |
| The DataStream application is executed in the regular distributed manner on the target |
| environment, and the events from the stream are polled back to this application process and |
| thread through Flink's REST API. |
| |
| The returned iterator must be closed to free all cluster resources. |
| |
| :param job_execution_name: The name of the job execution. |
| :param limit: The limit for the collected elements. |
| """ |
| JPythonConfigUtil = get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil |
| JPythonConfigUtil.configPythonOperator(self._j_data_stream.getExecutionEnvironment()) |
| self._apply_chaining_optimization() |
| if job_execution_name is None and limit is None: |
| return CloseableIterator(self._j_data_stream.executeAndCollect(), self.get_type()) |
| elif job_execution_name is not None and limit is None: |
| return CloseableIterator(self._j_data_stream.executeAndCollect(job_execution_name), |
| self.get_type()) |
| if job_execution_name is None and limit is not None: |
| return list(map(lambda data: convert_to_python_obj(data, self.get_type()), |
| self._j_data_stream.executeAndCollect(limit))) |
| else: |
| return list(map(lambda data: convert_to_python_obj(data, self.get_type()), |
| self._j_data_stream.executeAndCollect(job_execution_name, limit))) |
| |
| def print(self, sink_identifier: str = None) -> 'DataStreamSink': |
| """ |
| Writes a DataStream to the standard output stream (stdout). |
| For each element of the DataStream the object string is written. |
| |
| NOTE: This will print to stdout on the machine where the code is executed, i.e. the Flink |
| worker, and is not fault tolerant. |
| |
| :param sink_identifier: The string to prefix the output with. |
| :return: The closed DataStream. |
| """ |
| if sink_identifier is not None: |
| j_data_stream_sink = self._align_output_type()._j_data_stream.print(sink_identifier) |
| else: |
| j_data_stream_sink = self._align_output_type()._j_data_stream.print() |
| return DataStreamSink(j_data_stream_sink) |
| |
| def get_side_output(self, output_tag: OutputTag) -> 'DataStream': |
| """ |
| Gets the :class:`DataStream` that contains the elements that are emitted from an operation |
| into the side output with the given :class:`OutputTag`. |
| |
| :param output_tag: output tag for the side stream |
| :return: The DataStream with specified output tag |
| |
| .. versionadded:: 1.16.0 |
| """ |
| ds = DataStream(self._j_data_stream.getSideOutput(output_tag.get_java_output_tag())) |
| return ds.map(lambda i: i, output_type=output_tag.type_info) |
| |
| def cache(self) -> 'CachedDataStream': |
| """ |
| Cache the intermediate result of the transformation. Only support bounded streams and |
| currently only block mode is supported. The cache is generated lazily at the first time the |
| intermediate result is computed. The cache will be clear when the StreamExecutionEnvironment |
| close. |
| |
| :return: The cached DataStream that can use in later job to reuse the cached intermediate |
| result. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return CachedDataStream(self._j_data_stream.cache()) |
| |
| def _apply_chaining_optimization(self): |
| """ |
| Chain the Python operators if possible. |
| """ |
| gateway = get_gateway() |
| JPythonOperatorChainingOptimizer = gateway.jvm.org.apache.flink.python.chain. \ |
| PythonOperatorChainingOptimizer |
| j_transformation = JPythonOperatorChainingOptimizer.apply( |
| self._j_data_stream.getExecutionEnvironment(), |
| self._j_data_stream.getTransformation()) |
| self._j_data_stream = gateway.jvm.org.apache.flink.streaming.api.datastream.DataStream( |
| self._j_data_stream.getExecutionEnvironment(), j_transformation) |
| |
| def _align_output_type(self) -> 'DataStream': |
| """ |
| Transform the pickled python object into String if the output type is PickledByteArrayInfo. |
| """ |
| from py4j.java_gateway import get_java_class |
| |
| gateway = get_gateway() |
| ExternalTypeInfo_CLASS = get_java_class( |
| gateway.jvm.org.apache.flink.table.runtime.typeutils.ExternalTypeInfo) |
| RowTypeInfo_CLASS = get_java_class( |
| gateway.jvm.org.apache.flink.api.java.typeutils.RowTypeInfo) |
| output_type_info_class = self._j_data_stream.getTransformation().getOutputType().getClass() |
| if output_type_info_class.isAssignableFrom( |
| Types.PICKLED_BYTE_ARRAY().get_java_type_info() |
| .getClass()): |
| def python_obj_to_str_map_func(value): |
| if not isinstance(value, (str, bytes)): |
| value = str(value) |
| return value |
| |
| transformed_data_stream = DataStream( |
| self.map(python_obj_to_str_map_func, |
| output_type=Types.STRING())._j_data_stream) |
| return transformed_data_stream |
| elif (output_type_info_class.isAssignableFrom(ExternalTypeInfo_CLASS) or |
| output_type_info_class.isAssignableFrom(RowTypeInfo_CLASS)): |
| def python_obj_to_str_map_func(value): |
| assert isinstance(value, Row) |
| return '{}[{}]'.format(value.get_row_kind(), |
| ','.join([str(item) for item in value._values])) |
| |
| transformed_data_stream = DataStream( |
| self.map(python_obj_to_str_map_func, |
| output_type=Types.STRING())._j_data_stream) |
| return transformed_data_stream |
| else: |
| return self |
| |
| |
| class DataStreamSink(object): |
| """ |
| A Stream Sink. This is used for emitting elements from a streaming topology. |
| """ |
| |
| def __init__(self, j_data_stream_sink): |
| """ |
| The constructor of DataStreamSink. |
| |
| :param j_data_stream_sink: A DataStreamSink java object. |
| """ |
| self._j_data_stream_sink = j_data_stream_sink |
| |
| def name(self, name: str) -> 'DataStreamSink': |
| """ |
| Sets the name of this sink. THis name is used by the visualization and logging during |
| runtime. |
| |
| :param name: The name of this sink. |
| :return: The named sink. |
| """ |
| self._j_data_stream_sink.name(name) |
| return self |
| |
| def uid(self, uid: str) -> 'DataStreamSink': |
| """ |
| Sets an ID for this operator. The specified ID is used to assign the same operator ID across |
| job submissions (for example when starting a job from a savepoint). |
| |
| Important: this ID needs to be unique per transformation and job. Otherwise, job submission |
| will fail. |
| |
| :param uid: The unique user-specified ID of this transformation. |
| :return: The operator with the specified ID. |
| """ |
| self._j_data_stream_sink.uid(uid) |
| return self |
| |
| def set_uid_hash(self, uid_hash: str) -> 'DataStreamSink': |
| """ |
| Sets an user provided hash for this operator. This will be used AS IS the create the |
| JobVertexID. The user provided hash is an alternative to the generated hashed, that is |
| considered when identifying an operator through the default hash mechanics fails (e.g. |
| because of changes between Flink versions). |
| |
| Important: this should be used as a workaround or for trouble shooting. The provided hash |
| needs to be unique per transformation and job. Otherwise, job submission will fail. |
| Furthermore, you cannot assign user-specified hash to intermediate nodes in an operator |
| chain and trying so will let your job fail. |
| |
| A use case for this is in migration between Flink versions or changing the jobs in a way |
| that changes the automatically generated hashes. In this case, providing the previous hashes |
| directly through this method (e.g. obtained from old logs) can help to reestablish a lost |
| mapping from states to their target operator. |
| |
| :param uid_hash: The user provided hash for this operator. This will become the jobVertexID, |
| which is shown in the logs and web ui. |
| :return: The operator with the user provided hash. |
| """ |
| self._j_data_stream_sink.setUidHash(uid_hash) |
| return self |
| |
| def set_parallelism(self, parallelism: int) -> 'DataStreamSink': |
| """ |
| Sets the parallelism for this operator. |
| |
| :param parallelism: THe parallelism for this operator. |
| :return: The operator with set parallelism. |
| """ |
| self._j_data_stream_sink.setParallelism(parallelism) |
| return self |
| |
| def set_description(self, description: str) -> 'DataStreamSink': |
| """ |
| Sets the description for this sink. |
| |
| Description is used in json plan and web ui, but not in logging and metrics where only |
| name is available. Description is expected to provide detailed information about the sink, |
| while name is expected to be more simple, providing summary information only, so that we can |
| have more user-friendly logging messages and metric tags without losing useful messages for |
| debugging. |
| |
| :param description: The description for this sink. |
| :return: The sink with new description. |
| |
| .. versionadded:: 1.15.0 |
| """ |
| self._j_data_stream_sink.setDescription(description) |
| return self |
| |
| def disable_chaining(self) -> 'DataStreamSink': |
| """ |
| Turns off chaining for this operator so thread co-location will not be used as an |
| optimization. |
| Chaining can be turned off for the whole job by |
| StreamExecutionEnvironment.disableOperatorChaining() however it is not advised for |
| performance consideration. |
| |
| :return: The operator with chaining disabled. |
| """ |
| self._j_data_stream_sink.disableChaining() |
| return self |
| |
| def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) \ |
| -> 'DataStreamSink': |
| """ |
| Sets the slot sharing group of this operation. Parallel instances of operations that are in |
| the same slot sharing group will be co-located in the same TaskManager slot, if possible. |
| |
| Operations inherit the slot sharing group of input operations if all input operations are in |
| the same slot sharing group and no slot sharing group was explicitly specified. |
| |
| Initially an operation is in the default slot sharing group. An operation can be put into |
| the default group explicitly by setting the slot sharing group to 'default'. |
| |
| :param slot_sharing_group: The slot sharing group name or which contains name and its |
| resource spec. |
| :return: This operator. |
| """ |
| if isinstance(slot_sharing_group, SlotSharingGroup): |
| self._j_data_stream_sink.slotSharingGroup( |
| slot_sharing_group.get_java_slot_sharing_group()) |
| else: |
| self._j_data_stream_sink.slotSharingGroup(slot_sharing_group) |
| return self |
| |
| |
| class KeyedStream(DataStream): |
| """ |
| A KeyedStream represents a DataStream on which operator state is partitioned by key using a |
| provided KeySelector. Typical operations supported by a DataStream are also possible on a |
| KeyedStream, with the exception of partitioning methods such as shuffle, forward and keyBy. |
| |
| Reduce-style operations, such as reduce and sum work on elements that have the same key. |
| """ |
| |
| def __init__(self, j_keyed_stream, original_data_type_info, origin_stream: DataStream): |
| """ |
| Constructor of KeyedStream. |
| |
| :param j_keyed_stream: A java KeyedStream object. |
| :param original_data_type_info: Original data typeinfo. |
| :param origin_stream: The DataStream before key by. |
| """ |
| super(KeyedStream, self).__init__(j_data_stream=j_keyed_stream) |
| self._original_data_type_info = original_data_type_info |
| self._origin_stream = origin_stream |
| |
| def map(self, func: Union[Callable, MapFunction], output_type: TypeInformation = None) \ |
| -> 'DataStream': |
| """ |
| Applies a Map transformation on a KeyedStream. The transformation calls a MapFunction for |
| each element of the DataStream. Each MapFunction call returns exactly one element. |
| |
| Note that If user does not specify the output data type, the output data will be serialized |
| as pickle primitive byte array. |
| |
| :param func: The MapFunction that is called for each element of the DataStream. |
| :param output_type: The type information of the MapFunction output data. |
| :return: The transformed DataStream. |
| """ |
| if not isinstance(func, MapFunction) and not callable(func): |
| raise TypeError("The input must be a MapFunction or a callable function") |
| |
| class MapKeyedProcessFunctionAdapter(KeyedProcessFunction): |
| |
| def __init__(self, map_func): |
| if isinstance(map_func, MapFunction): |
| self._open_func = map_func.open |
| self._close_func = map_func.close |
| self._map_func = map_func.map |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._map_func = map_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): |
| yield self._map_func(value) |
| |
| return self.process(MapKeyedProcessFunctionAdapter(func), output_type) \ |
| .name("Map") # type: ignore |
| |
| def flat_map(self, |
| func: Union[Callable, FlatMapFunction], |
| output_type: TypeInformation = None) -> 'DataStream': |
| """ |
| Applies a FlatMap transformation on a KeyedStream. The transformation calls a |
| FlatMapFunction for each element of the DataStream. Each FlatMapFunction call can return |
| any number of elements including none. |
| |
| :param func: The FlatMapFunction that is called for each element of the DataStream. |
| :param output_type: The type information of output data. |
| :return: The transformed DataStream. |
| """ |
| |
| if not isinstance(func, FlatMapFunction) and not callable(func): |
| raise TypeError("The input must be a FlatMapFunction or a callable function") |
| |
| class FlatMapKeyedProcessFunctionAdapter(KeyedProcessFunction): |
| |
| def __init__(self, flat_map_func): |
| if isinstance(flat_map_func, FlatMapFunction): |
| self._open_func = flat_map_func.open |
| self._close_func = flat_map_func.close |
| self._flat_map_func = flat_map_func.flat_map |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._flat_map_func = flat_map_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): |
| yield from self._flat_map_func(value) |
| |
| return self.process(FlatMapKeyedProcessFunctionAdapter(func), output_type) \ |
| .name("FlatMap") |
| |
| def reduce(self, func: Union[Callable, ReduceFunction]) -> 'DataStream': |
| """ |
| Applies a reduce transformation on the grouped data stream grouped on by the given |
| key position. The `ReduceFunction` will receive input values based on the key value. |
| Only input values with the same key will go to the same reducer. |
| |
| Example: |
| :: |
| |
| >>> ds = env.from_collection([(1, 'a'), (2, 'a'), (3, 'a'), (4, 'b']) |
| >>> ds.key_by(lambda x: x[1]).reduce(lambda a, b: a[0] + b[0], b[1]) |
| |
| :param func: The ReduceFunction that is called for each element of the DataStream. |
| :return: The transformed DataStream. |
| """ |
| |
| if not isinstance(func, ReduceFunction) and not callable(func): |
| raise TypeError("The input must be a ReduceFunction or a callable function") |
| |
| output_type = _from_java_type(self._original_data_type_info.get_java_type_info()) |
| |
| gateway = get_gateway() |
| j_conf = get_j_env_configuration(self._j_data_stream.getExecutionEnvironment()) |
| python_execution_mode = ( |
| j_conf.getString( |
| gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE)) |
| |
| class ReduceProcessKeyedProcessFunctionAdapter(KeyedProcessFunction): |
| |
| def __init__(self, reduce_function): |
| if isinstance(reduce_function, ReduceFunction): |
| self._open_func = reduce_function.open |
| self._close_func = reduce_function.close |
| self._reduce_function = reduce_function.reduce |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._reduce_function = reduce_function |
| self._reduce_state = None # type: ReducingState |
| self._in_batch_execution_mode = True |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| self._reduce_state = runtime_context.get_reducing_state( |
| ReducingStateDescriptor( |
| "_reduce_state" + str(uuid.uuid4()), |
| self._reduce_function, |
| output_type)) |
| |
| if python_execution_mode == "process": |
| from pyflink.fn_execution.datastream.process.runtime_context import ( |
| StreamingRuntimeContext) |
| self._in_batch_execution_mode = ( |
| cast(StreamingRuntimeContext, runtime_context)._in_batch_execution_mode) |
| else: |
| self._in_batch_execution_mode = runtime_context.get_job_parameter( |
| "inBatchExecutionMode", "false") == "true" |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): |
| if self._in_batch_execution_mode: |
| reduce_value = self._reduce_state.get() |
| if reduce_value is None: |
| # register a timer for emitting the result at the end when this is the |
| # first input for this key |
| ctx.timer_service().register_event_time_timer(0x7fffffffffffffff) |
| self._reduce_state.add(value) |
| else: |
| self._reduce_state.add(value) |
| # only emitting the result when all the data for a key is received |
| yield self._reduce_state.get() |
| |
| def on_timer(self, timestamp: int, ctx: 'KeyedProcessFunction.OnTimerContext'): |
| current_value = self._reduce_state.get() |
| if current_value is not None: |
| yield current_value |
| |
| return self.process(ReduceProcessKeyedProcessFunctionAdapter(func), output_type) \ |
| .name("Reduce") |
| |
| def filter(self, func: Union[Callable, FilterFunction]) -> 'DataStream': |
| if not isinstance(func, FilterFunction) and not callable(func): |
| raise TypeError("The input must be a FilterFunction or a callable function") |
| |
| class FilterKeyedProcessFunctionAdapter(KeyedProcessFunction): |
| |
| def __init__(self, filter_func): |
| if isinstance(filter_func, FilterFunction): |
| self._open_func = filter_func.open |
| self._close_func = filter_func.close |
| self._filter_func = filter_func.filter |
| else: |
| self._open_func = None |
| self._close_func = None |
| self._filter_func = filter_func |
| |
| def open(self, runtime_context: RuntimeContext): |
| if self._open_func: |
| self._open_func(runtime_context) |
| |
| def close(self): |
| if self._close_func: |
| self._close_func() |
| |
| def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): |
| if self._filter_func(value): |
| yield value |
| |
| return self.process(FilterKeyedProcessFunctionAdapter(func), self._original_data_type_info)\ |
| .name("Filter") |
| |
| class AccumulateType(Enum): |
| MIN = 1 |
| MAX = 2 |
| MIN_BY = 3 |
| MAX_BY = 4 |
| SUM = 5 |
| |
| def _accumulate(self, position: Union[int, str], acc_type: AccumulateType): |
| """ |
| The base method is used for operators such as min, max, min_by, max_by, sum. |
| """ |
| if not isinstance(position, int) and not isinstance(position, str): |
| raise TypeError("The field position must be of int or str type to locate the value to " |
| "calculate for min, max, min_by, max_by and sum." |
| "The given type is: %s" % type(position)) |
| |
| class AccumulateReduceFunction(ReduceFunction): |
| |
| def __init__(self, position, agg_type): |
| self._pos = position |
| self._agg_type = agg_type |
| self._reduce_func = None |
| |
| def reduce(self, value1, value2): |
| |
| def init_reduce_func(value_to_check): |
| if acc_type == KeyedStream.AccumulateType.MIN_BY: |
| # Logic for min_by operator. |
| def reduce_func(v1, v2): |
| if isinstance(value_to_check, (tuple, list, Row)): |
| return v2 if v2[self._pos] < v1[self._pos] else v1 |
| else: |
| return v2 if v2 < v1 else v1 |
| self._reduce_func = reduce_func |
| |
| elif acc_type == KeyedStream.AccumulateType.MAX_BY: |
| # Logic for max_by operator. |
| def reduce_func(v1, v2): |
| if isinstance(value_to_check, (tuple, list, Row)): |
| return v2 if v2[self._pos] > v1[self._pos] else v1 |
| else: |
| return v2 if v2 > v1 else v1 |
| self._reduce_func = reduce_func |
| |
| # for MIN / MAX / SUM |
| elif isinstance(value_to_check, tuple): |
| def reduce_func(v1, v2): |
| v1_list = list(v1) |
| if acc_type == KeyedStream.AccumulateType.MIN: |
| # Logic for min operator with tuple type input. |
| v1_list[self._pos] = v2[self._pos] \ |
| if v2[self._pos] < v1[self._pos] else v1[self._pos] |
| elif acc_type == KeyedStream.AccumulateType.MAX: |
| # Logic for max operator with tuple type input. |
| v1_list[self._pos] = v2[self._pos] \ |
| if v2[self._pos] > v1[self._pos] else v1[self._pos] |
| else: |
| # Logic for sum operator with tuple type input. |
| v1_list[self._pos] = v1[self._pos] + v2[self._pos] |
| return tuple(v1_list) |
| return tuple(v1_list) |
| |
| self._reduce_func = reduce_func |
| |
| elif isinstance(value_to_check, (list, Row)): |
| def reduce_func(v1, v2): |
| if acc_type == KeyedStream.AccumulateType.MIN: |
| # Logic for min operator with List and Row types input. |
| v1[self._pos] = v2[self._pos] \ |
| if v2[self._pos] < v1[self._pos] else v1[self._pos] |
| elif acc_type == KeyedStream.AccumulateType.MAX: |
| # Logic for max operator with List and Row types input. |
| v1[self._pos] = v2[self._pos] \ |
| if v2[self._pos] > v1[self._pos] else v1[self._pos] |
| else: |
| # Logic for sum operator with List and Row types input. |
| v1[self._pos] = v1[self._pos] + v2[self._pos] |
| return v1 |
| |
| self._reduce_func = reduce_func |
| |
| else: |
| if self._pos != 0: |
| raise TypeError( |
| "The %s field selected on a basic type. A field expression " |
| "on a basic type can only select the 0th field (which means " |
| "selecting the entire basic type)." % self._pos) |
| |
| def reduce_func(v1, v2): |
| if acc_type == KeyedStream.AccumulateType.MIN: |
| # Logic for min operator with basic type input. |
| return v2 if v2 < v1 else v1 |
| elif acc_type == KeyedStream.AccumulateType.MAX: |
| # Logic for max operator with basic type input. |
| return v2 if v2 > v1 else v1 |
| else: |
| # Logic for sum operator with basic type input. |
| return v1 + v2 |
| |
| self._reduce_func = reduce_func |
| |
| if not self._reduce_func: |
| init_reduce_func(value2) |
| return self._reduce_func(value1, value2) |
| |
| return self.reduce(AccumulateReduceFunction(position, acc_type)) |
| |
| def sum(self, position_to_sum: Union[int, str] = 0) -> 'DataStream': |
| """ |
| Applies an aggregation that gives a rolling sum of the data stream at the given position |
| grouped by the given key. An independent aggregate is kept per key. |
| |
| Example(Tuple data to sum): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)]) |
| >>> ds.key_by(lambda x: x[0]).sum(1) |
| |
| Example(Row data to sum): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW([Types.STRING(), Types.INT()])) |
| >>> ds.key_by(lambda x: x[0]).sum(1) |
| |
| Example(Row data with fields name to sum): |
| :: |
| |
| >>> ds = env.from_collection( |
| ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]) |
| ... ) |
| >>> ds.key_by(lambda x: x[0]).sum("value") |
| |
| :param position_to_sum: The field position in the data points to sum, type can be int which |
| indicates the index of the column to operate on or str which |
| indicates the name of the column to operate on. |
| :return: The transformed DataStream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return self._accumulate(position_to_sum, KeyedStream.AccumulateType.SUM) |
| |
| def min(self, position_to_min: Union[int, str] = 0) -> 'DataStream': |
| """ |
| Applies an aggregation that gives the current minimum of the data stream at the given |
| position by the given key. An independent aggregate is kept per key. |
| |
| Example(Tuple data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)]) |
| >>> ds.key_by(lambda x: x[0]).min(1) |
| |
| Example(Row data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW([Types.STRING(), Types.INT()])) |
| >>> ds.key_by(lambda x: x[0]).min(1) |
| |
| Example(Row data with fields name): |
| :: |
| |
| >>> ds = env.from_collection( |
| ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]) |
| ... ) |
| >>> ds.key_by(lambda x: x[0]).min("value") |
| |
| :param position_to_min: The field position in the data points to minimize. The type can be |
| int (field position) or str (field name). This is applicable to |
| Tuple types, List types, Row types, and basic types (which is |
| considered as having one field). |
| :return: The transformed DataStream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return self._accumulate(position_to_min, KeyedStream.AccumulateType.MIN) |
| |
| def max(self, position_to_max: Union[int, str] = 0) -> 'DataStream': |
| """ |
| Applies an aggregation that gives the current maximize of the data stream at the given |
| position by the given key. An independent aggregate is kept per key. |
| |
| Example(Tuple data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)]) |
| >>> ds.key_by(lambda x: x[0]).max(1) |
| |
| Example(Row data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW([Types.STRING(), Types.INT()])) |
| >>> ds.key_by(lambda x: x[0]).max(1) |
| |
| Example(Row data with fields name): |
| :: |
| |
| >>> ds = env.from_collection( |
| ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]) |
| ... ) |
| >>> ds.key_by(lambda x: x[0]).max("value") |
| |
| :param position_to_max: The field position in the data points to maximize. The type can be |
| int (field position) or str (field name). This is applicable to |
| Tuple types, List types, Row types, and basic types (which is |
| considered as having one field). |
| :return: The transformed DataStream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return self._accumulate(position_to_max, KeyedStream.AccumulateType.MAX) |
| |
| def min_by(self, position_to_min_by: Union[int, str] = 0) -> 'DataStream': |
| """ |
| Applies an aggregation that gives the current element with the minimum value at the |
| given position by the given key. An independent aggregate is kept per key. |
| If more elements have the minimum value at the given position, |
| the operator returns the first one by default. |
| |
| Example(Tuple data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)]) |
| >>> ds.key_by(lambda x: x[0]).min_by(1) |
| |
| Example(Row data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW([Types.STRING(), Types.INT()])) |
| >>> ds.key_by(lambda x: x[0]).min_by(1) |
| |
| Example(Row data with fields name): |
| :: |
| |
| >>> ds = env.from_collection( |
| ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]) |
| ... ) |
| >>> ds.key_by(lambda x: x[0]).min_by("value") |
| |
| :param position_to_min_by: The field position in the data points to minimize. The type can |
| be int (field position) or str (field name). This is applicable |
| to Tuple types, List types, Row types, and basic types (which is |
| considered as having one field). |
| :return: The transformed DataStream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return self._accumulate(position_to_min_by, KeyedStream.AccumulateType.MIN_BY) |
| |
| def max_by(self, position_to_max_by: Union[int, str] = 0) -> 'DataStream': |
| """ |
| Applies an aggregation that gives the current element with the maximize value at the |
| given position by the given key. An independent aggregate is kept per key. |
| If more elements have the maximize value at the given position, |
| the operator returns the first one by default. |
| |
| |
| Example(Tuple data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('b', 1), ('b', 5)]) |
| >>> ds.key_by(lambda x: x[0]).max_by(1) |
| |
| Example(Row data): |
| :: |
| |
| >>> ds = env.from_collection([('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW([Types.STRING(), Types.INT()])) |
| >>> ds.key_by(lambda x: x[0]).max_by(1) |
| |
| Example(Row data with fields name): |
| :: |
| |
| >>> ds = env.from_collection( |
| ... [('a', 1), ('a', 2), ('a', 3), ('b', 1), ('b', 2)], |
| ... type_info=Types.ROW_NAMED(["key", "value"], [Types.STRING(), Types.INT()]) |
| ... ) |
| >>> ds.key_by(lambda x: x[0]).max_by("value") |
| |
| :param position_to_max_by: The field position in the data points to maximize. The type can |
| be int (field position) or str (field name). This is applicable |
| to Tuple types, List types, Row types, and basic types (which is |
| considered as having one field). |
| :return: The transformed DataStream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| return self._accumulate(position_to_max_by, KeyedStream.AccumulateType.MAX_BY) |
| |
| def add_sink(self, sink_func: SinkFunction) -> 'DataStreamSink': |
| return self._values().add_sink(sink_func) |
| |
| def key_by(self, key_selector: Union[Callable, KeySelector], |
| key_type: TypeInformation = None) -> 'KeyedStream': |
| return self._origin_stream.key_by(key_selector, key_type) |
| |
| def process(self, func: KeyedProcessFunction, # type: ignore |
| output_type: TypeInformation = None) -> 'DataStream': |
| """ |
| Applies the given ProcessFunction on the input stream, thereby creating a transformed output |
| stream. |
| |
| The function will be called for every element in the input streams and can produce zero or |
| more output elements. |
| |
| :param func: The KeyedProcessFunction that is called for each element in the stream. |
| :param output_type: TypeInformation for the result type of the function. |
| :return: The transformed DataStream. |
| """ |
| if not isinstance(func, KeyedProcessFunction): |
| raise TypeError("KeyedProcessFunction is required for KeyedStream.") |
| |
| from pyflink.fn_execution import flink_fn_execution_pb2 |
| j_python_data_stream_function_operator, j_output_type_info = \ |
| _get_one_input_stream_operator( |
| self, |
| func, |
| flink_fn_execution_pb2.UserDefinedDataStreamFunction.KEYED_PROCESS, # type: ignore |
| output_type) |
| return DataStream(self._j_data_stream.transform( |
| "KEYED PROCESS", |
| j_output_type_info, |
| j_python_data_stream_function_operator)) |
| |
| def window(self, window_assigner: WindowAssigner) -> 'WindowedStream': |
| """ |
| Windows this data stream to a WindowedStream, which evaluates windows over a key |
| grouped stream. Elements are put into windows by a WindowAssigner. The grouping of |
| elements is done both by key and by window. |
| |
| A Trigger can be defined to specify when windows are evaluated. However, WindowAssigners |
| have a default Trigger that is used if a Trigger is not specified. |
| |
| :param window_assigner: The WindowAssigner that assigns elements to windows. |
| :return: The trigger windows data stream. |
| """ |
| return WindowedStream(self, window_assigner) |
| |
| def count_window(self, size: int, slide: int = 0): |
| """ |
| Windows this KeyedStream into tumbling or sliding count windows. |
| |
| :param size: The size of the windows in number of elements. |
| :param slide: The slide interval in number of elements. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| if slide == 0: |
| return WindowedStream(self, CountTumblingWindowAssigner(size)) |
| else: |
| return WindowedStream(self, CountSlidingWindowAssigner(size, slide)) |
| |
| def union(self, *streams) -> 'DataStream': |
| return self._values().union(*streams) |
| |
| @overload |
| def connect(self, ds: 'DataStream') -> 'ConnectedStreams': |
| pass |
| |
| @overload |
| def connect(self, ds: 'BroadcastStream') -> 'BroadcastConnectedStream': |
| pass |
| |
| def connect(self, ds: Union['DataStream', 'BroadcastStream']) \ |
| -> Union['ConnectedStreams', 'BroadcastConnectedStream']: |
| """ |
| If ds is a :class:`DataStream`, creates a new :class:`ConnectedStreams` by connecting |
| DataStream outputs of (possible) different types with each other. The DataStreams connected |
| using this operator can be used with CoFunctions to apply joint transformations. |
| |
| If ds is a :class:`BroadcastStream`, creates a new :class:`BroadcastConnectedStream` by |
| connecting the current :class:`DataStream` with a :class:`BroadcastStream`. The latter can |
| be created using the :meth:`broadcast` method. The resulting stream can be further processed |
| using the :meth:`BroadcastConnectedStream.process` method. |
| |
| :param ds: The DataStream or BroadcastStream with which this stream will be connected. |
| :return: The ConnectedStreams or BroadcastConnectedStream. |
| |
| .. versionchanged:: 1.16.0 |
| Support connect BroadcastStream |
| """ |
| return super().connect(ds) |
| |
| def shuffle(self) -> 'DataStream': |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def project(self, *field_indexes) -> 'DataStream': |
| return self._values().project(*field_indexes) |
| |
| def rescale(self) -> 'DataStream': |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def rebalance(self) -> 'DataStream': |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def forward(self) -> 'DataStream': |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def broadcast(self, *args): |
| """ |
| Not supported, partitioning for KeyedStream cannot be overridden. |
| """ |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def partition_custom(self, partitioner: Union[Callable, Partitioner], |
| key_selector: Union[Callable, KeySelector]) -> 'DataStream': |
| raise Exception('Cannot override partitioning for KeyedStream.') |
| |
| def print(self, sink_identifier=None): |
| return self._values().print() |
| |
| def _values(self) -> 'DataStream': |
| """ |
| Since python KeyedStream is in the format of Row(key_value, original_data), it is used for |
| getting the original_data. |
| """ |
| transformed_stream = self.map(lambda x: x, output_type=self._original_data_type_info) |
| transformed_stream.name(get_gateway().jvm.org.apache.flink.python.util.PythonConfigUtil |
| .KEYED_STREAM_VALUE_OPERATOR_NAME) |
| return DataStream(transformed_stream._j_data_stream) |
| |
| def set_parallelism(self, parallelism: int): |
| raise Exception("Set parallelism for KeyedStream is not supported.") |
| |
| def name(self, name: str): |
| raise Exception("Set name for KeyedStream is not supported.") |
| |
| def get_name(self) -> str: |
| raise Exception("Get name of KeyedStream is not supported.") |
| |
| def uid(self, uid: str): |
| raise Exception("Set uid for KeyedStream is not supported.") |
| |
| def set_uid_hash(self, uid_hash: str): |
| raise Exception("Set uid hash for KeyedStream is not supported.") |
| |
| def set_max_parallelism(self, max_parallelism: int): |
| raise Exception("Set max parallelism for KeyedStream is not supported.") |
| |
| def force_non_parallel(self): |
| raise Exception("Set force non-parallel for KeyedStream is not supported.") |
| |
| def set_buffer_timeout(self, timeout_millis: int): |
| raise Exception("Set buffer timeout for KeyedStream is not supported.") |
| |
| def start_new_chain(self) -> 'DataStream': |
| raise Exception("Start new chain for KeyedStream is not supported.") |
| |
| def disable_chaining(self) -> 'DataStream': |
| raise Exception("Disable chaining for KeyedStream is not supported.") |
| |
| def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) -> 'DataStream': |
| raise Exception("Setting slot sharing group for KeyedStream is not supported.") |
| |
| def cache(self) -> 'CachedDataStream': |
| raise Exception("Cache for KeyedStream is not supported.") |
| |
| |
| class CachedDataStream(DataStream): |
| """ |
| CachedDataStream represents a DataStream whose intermediate result will be cached at the first |
| time when it is computed. And the cached intermediate result can be used in later job that using |
| the same CachedDataStream to avoid re-computing the intermediate result. |
| """ |
| |
| def __init__(self, j_data_stream): |
| super(CachedDataStream, self).__init__(j_data_stream) |
| |
| def invalidate(self): |
| """ |
| Invalidate the cache intermediate result of this DataStream to release the physical |
| resources. Users are not required to invoke this method to release physical resources unless |
| they want to. Cache will be recreated if it is used after invalidated. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| self._j_data_stream.invalidate() |
| |
| def set_parallelism(self, parallelism: int): |
| raise Exception("Set parallelism for CachedDataStream is not supported.") |
| |
| def name(self, name: str): |
| raise Exception("Set name for CachedDataStream is not supported.") |
| |
| def get_name(self) -> str: |
| raise Exception("Get name of CachedDataStream is not supported.") |
| |
| def uid(self, uid: str): |
| raise Exception("Set uid for CachedDataStream is not supported.") |
| |
| def set_uid_hash(self, uid_hash: str): |
| raise Exception("Set uid hash for CachedDataStream is not supported.") |
| |
| def set_max_parallelism(self, max_parallelism: int): |
| raise Exception("Set max parallelism for CachedDataStream is not supported.") |
| |
| def force_non_parallel(self): |
| raise Exception("Set force non-parallel for CachedDataStream is not supported.") |
| |
| def set_buffer_timeout(self, timeout_millis: int): |
| raise Exception("Set buffer timeout for CachedDataStream is not supported.") |
| |
| def start_new_chain(self) -> 'DataStream': |
| raise Exception("Start new chain for CachedDataStream is not supported.") |
| |
| def disable_chaining(self) -> 'DataStream': |
| raise Exception("Disable chaining for CachedDataStream is not supported.") |
| |
| def slot_sharing_group(self, slot_sharing_group: Union[str, SlotSharingGroup]) -> 'DataStream': |
| raise Exception("Setting slot sharing group for CachedDataStream is not supported.") |
| |
| |
| class WindowedStream(object): |
| """ |
| A WindowedStream represents a data stream where elements are grouped by key, and for each |
| key, the stream of elements is split into windows based on a WindowAssigner. Window emission |
| is triggered based on a Trigger. |
| |
| The windows are conceptually evaluated for each key individually, meaning windows can trigger |
| at different points for each key. |
| |
| Note that the WindowedStream is purely an API construct, during runtime the WindowedStream will |
| be collapsed together with the KeyedStream and the operation over the window into one single |
| operation. |
| """ |
| |
| def __init__(self, keyed_stream: KeyedStream, window_assigner: WindowAssigner): |
| self._keyed_stream = keyed_stream |
| self._window_assigner = window_assigner |
| self._allowed_lateness = 0 |
| self._late_data_output_tag = None # type: Optional[OutputTag] |
| self._window_trigger = None # type: Trigger |
| |
| def get_execution_environment(self): |
| return self._keyed_stream.get_execution_environment() |
| |
| def get_input_type(self): |
| return _from_java_type(self._keyed_stream._original_data_type_info.get_java_type_info()) |
| |
| def trigger(self, trigger: Trigger) -> 'WindowedStream': |
| """ |
| Sets the Trigger that should be used to trigger window emission. |
| """ |
| self._window_trigger = trigger |
| return self |
| |
| def allowed_lateness(self, time_ms: int) -> 'WindowedStream': |
| """ |
| Sets the time by which elements are allowed to be late. Elements that arrive behind the |
| watermark by more than the specified time will be dropped. By default, the allowed lateness |
| is 0. |
| |
| Setting an allowed lateness is only valid for event-time windows. |
| """ |
| self._allowed_lateness = time_ms |
| return self |
| |
| def side_output_late_data(self, output_tag: OutputTag) -> 'WindowedStream': |
| """ |
| Send late arriving data to the side output identified by the given :class:`OutputTag`. Data |
| is considered late after the watermark has passed the end of the window plus the allowed |
| lateness set using :func:`allowed_lateness`. |
| |
| You can get the stream of late data using :func:`~DataStream.get_side_output` on the |
| :class:`DataStream` resulting from the windowed operation with the same :class:`OutputTag`. |
| |
| Example: |
| :: |
| |
| >>> tag = OutputTag("late-data", Types.TUPLE([Types.INT(), Types.STRING()])) |
| >>> main_stream = ds.key_by(lambda x: x[1]) \\ |
| ... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .side_output_late_data(tag) \\ |
| ... .reduce(lambda a, b: a[0] + b[0], b[1]) |
| >>> late_stream = main_stream.get_side_output(tag) |
| |
| .. versionadded:: 1.16.0 |
| """ |
| self._late_data_output_tag = output_tag |
| return self |
| |
| def reduce(self, |
| reduce_function: Union[Callable, ReduceFunction], |
| window_function: Union[WindowFunction, ProcessWindowFunction] = None, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies a reduce function to the window. The window function is called for each evaluation |
| of the window for each key individually. The output of the reduce function is interpreted as |
| a regular non-windowed stream. |
| |
| This window will try and incrementally aggregate data as much as the window policies |
| permit. For example, tumbling time windows can aggregate the data, meaning that only one |
| element per key is stored. Sliding time windows will aggregate on the granularity of the |
| slide interval, so a few elements are stored per key (one per slide interval). Custom |
| windows may not be able to incrementally aggregate, or may need to store extra values in an |
| aggregation tree. |
| |
| Example: |
| :: |
| |
| >>> ds.key_by(lambda x: x[1]) \\ |
| ... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .reduce(lambda a, b: a[0] + b[0], b[1]) |
| |
| :param reduce_function: The reduce function. |
| :param window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the reduce function to the window. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| if window_function is None: |
| internal_window_function = InternalSingleValueWindowFunction( |
| PassThroughWindowFunction()) # type: InternalWindowFunction |
| if output_type is None: |
| output_type = self.get_input_type() |
| elif isinstance(window_function, WindowFunction): |
| internal_window_function = InternalSingleValueWindowFunction(window_function) |
| elif isinstance(window_function, ProcessWindowFunction): |
| internal_window_function = InternalSingleValueProcessWindowFunction(window_function) |
| else: |
| raise TypeError("window_function should be a WindowFunction or ProcessWindowFunction") |
| |
| reducing_state_descriptor = ReducingStateDescriptor(WINDOW_STATE_NAME, |
| reduce_function, |
| self.get_input_type()) |
| func_desc = type(reduce_function).__name__ |
| if window_function is not None: |
| func_desc = "%s, %s" % (func_desc, type(window_function).__name__) |
| |
| return self._get_result_data_stream(internal_window_function, |
| reducing_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def aggregate(self, |
| aggregate_function: AggregateFunction, |
| window_function: Union[WindowFunction, ProcessWindowFunction] = None, |
| accumulator_type: TypeInformation = None, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Arriving data is incrementally aggregated using the given aggregate function. This means |
| that the window function typically has only a single value to process when called. |
| |
| Example: |
| :: |
| |
| >>> class AverageAggregate(AggregateFunction): |
| ... def create_accumulator(self) -> Tuple[int, int]: |
| ... return 0, 0 |
| ... |
| ... def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) \\ |
| ... -> Tuple[int, int]: |
| ... return accumulator[0] + value[1], accumulator[1] + 1 |
| ... |
| ... def get_result(self, accumulator: Tuple[int, int]) -> float: |
| ... return accumulator[0] / accumulator[1] |
| ... |
| ... def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]: |
| ... return a[0] + b[0], a[1] + b[1] |
| >>> ds.key_by(lambda x: x[1]) \\ |
| ... .window(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .aggregate(AverageAggregate(), |
| ... accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]), |
| ... output_type=Types.DOUBLE()) |
| |
| :param aggregate_function: The aggregation function that is used for incremental |
| aggregation. |
| :param window_function: The window function. |
| :param accumulator_type: Type information for the internal accumulator type of the |
| aggregation function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| if window_function is None: |
| internal_window_function = InternalSingleValueWindowFunction( |
| PassThroughWindowFunction()) # type: InternalWindowFunction |
| elif isinstance(window_function, WindowFunction): |
| internal_window_function = InternalSingleValueWindowFunction(window_function) |
| elif isinstance(window_function, ProcessWindowFunction): |
| internal_window_function = InternalSingleValueProcessWindowFunction(window_function) |
| else: |
| raise TypeError("window_function should be a WindowFunction or ProcessWindowFunction") |
| |
| if accumulator_type is None: |
| accumulator_type = Types.PICKLED_BYTE_ARRAY() |
| elif isinstance(accumulator_type, list): |
| accumulator_type = RowTypeInfo(accumulator_type) |
| |
| aggregating_state_descriptor = AggregatingStateDescriptor(WINDOW_STATE_NAME, |
| aggregate_function, |
| accumulator_type) |
| func_desc = type(aggregate_function).__name__ |
| if window_function is not None: |
| func_desc = "%s, %s" % (func_desc, type(window_function).__name__) |
| return self._get_result_data_stream(internal_window_function, |
| aggregating_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def apply(self, |
| window_function: WindowFunction, output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Note that this function requires that all data in the windows is buffered until the window |
| is evaluated, as the function provides no means of incremental aggregation. |
| |
| :param window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| """ |
| internal_window_function = InternalIterableWindowFunction( |
| window_function) # type: InternalWindowFunction |
| list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type()) |
| func_desc = type(window_function).__name__ |
| return self._get_result_data_stream(internal_window_function, |
| list_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def process(self, |
| process_window_function: ProcessWindowFunction, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Note that this function requires that all data in the windows is buffered until the window |
| is evaluated, as the function provides no means of incremental aggregation. |
| |
| :param process_window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| """ |
| internal_window_function = InternalIterableProcessWindowFunction( |
| process_window_function) # type: InternalWindowFunction |
| list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type()) |
| func_desc = type(process_window_function).__name__ |
| return self._get_result_data_stream(internal_window_function, |
| list_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def _get_result_data_stream(self, |
| internal_window_function: InternalWindowFunction, |
| window_state_descriptor: StateDescriptor, |
| func_desc: str, |
| output_type: TypeInformation): |
| if self._window_trigger is None: |
| self._window_trigger = self._window_assigner.get_default_trigger( |
| self.get_execution_environment()) |
| window_serializer = self._window_assigner.get_window_serializer() |
| window_operation_descriptor = WindowOperationDescriptor( |
| self._window_assigner, |
| self._window_trigger, |
| self._allowed_lateness, |
| self._late_data_output_tag, |
| window_state_descriptor, |
| window_serializer, |
| internal_window_function) |
| |
| from pyflink.fn_execution import flink_fn_execution_pb2 |
| j_python_data_stream_function_operator, j_output_type_info = \ |
| _get_one_input_stream_operator( |
| self._keyed_stream, |
| window_operation_descriptor, |
| flink_fn_execution_pb2.UserDefinedDataStreamFunction.WINDOW, # type: ignore |
| output_type) |
| |
| op_name = window_operation_descriptor.generate_op_name() |
| op_desc = window_operation_descriptor.generate_op_desc("Window", func_desc) |
| return DataStream(self._keyed_stream._j_data_stream.transform( |
| op_name, |
| j_output_type_info, |
| j_python_data_stream_function_operator)).set_description(op_desc) |
| |
| |
| class AllWindowedStream(object): |
| """ |
| A AllWindowedStream represents a data stream where the stream of elements is split into windows |
| based on a WindowAssigner. Window emission is triggered based on a Trigger. |
| |
| If an Evictor is specified it will be used to evict elements from the window after evaluation |
| was triggered by the Trigger but before the actual evaluation of the window. |
| When using an evictor, window performance will degrade significantly, since pre-aggregation of |
| window results cannot be used. |
| |
| Note that the AllWindowedStream is purely an API construct, during runtime the AllWindowedStream |
| will be collapsed together with the operation over the window into one single operation. |
| """ |
| |
| def __init__(self, data_stream: DataStream, window_assigner: WindowAssigner): |
| self._keyed_stream = data_stream.key_by(NullByteKeySelector()) |
| self._window_assigner = window_assigner |
| self._allowed_lateness = 0 |
| self._late_data_output_tag = None # type: Optional[OutputTag] |
| self._window_trigger = None # type: Trigger |
| |
| def get_execution_environment(self): |
| return self._keyed_stream.get_execution_environment() |
| |
| def get_input_type(self): |
| return _from_java_type(self._keyed_stream._original_data_type_info.get_java_type_info()) |
| |
| def trigger(self, trigger: Trigger) -> 'AllWindowedStream': |
| """ |
| Sets the Trigger that should be used to trigger window emission. |
| """ |
| if isinstance(self._window_assigner, MergingWindowAssigner) \ |
| and (trigger.can_merge() is not True): |
| raise TypeError("A merging window assigner cannot be used with a trigger that does " |
| "not support merging.") |
| |
| self._window_trigger = trigger |
| return self |
| |
| def allowed_lateness(self, time_ms: int) -> 'AllWindowedStream': |
| """ |
| Sets the time by which elements are allowed to be late. Elements that arrive behind the |
| watermark by more than the specified time will be dropped. By default, the allowed lateness |
| is 0. |
| |
| Setting an allowed lateness is only valid for event-time windows. |
| """ |
| self._allowed_lateness = time_ms |
| return self |
| |
| def side_output_late_data(self, output_tag: OutputTag) -> 'AllWindowedStream': |
| """ |
| Send late arriving data to the side output identified by the given :class:`OutputTag`. Data |
| is considered late after the watermark has passed the end of the window plus the allowed |
| lateness set using :func:`allowed_lateness`. |
| |
| You can get the stream of late data using :func:`~DataStream.get_side_output` on the |
| :class:`DataStream` resulting from the windowed operation with the same :class:`OutputTag`. |
| |
| Example: |
| :: |
| |
| >>> tag = OutputTag("late-data", Types.TUPLE([Types.INT(), Types.STRING()])) |
| >>> main_stream = ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .side_output_late_data(tag) \\ |
| ... .process(MyProcessAllWindowFunction(), |
| ... Types.TUPLE([Types.LONG(), Types.LONG(), Types.INT()])) |
| >>> late_stream = main_stream.get_side_output(tag) |
| """ |
| self._late_data_output_tag = output_tag |
| return self |
| |
| def reduce(self, |
| reduce_function: Union[Callable, ReduceFunction], |
| window_function: Union[AllWindowFunction, ProcessAllWindowFunction] = None, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Arriving data is incrementally aggregated using the given reducer. |
| |
| Example: |
| :: |
| |
| >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .reduce(lambda a, b: a[0] + b[0], b[1]) |
| |
| :param reduce_function: The reduce function. |
| :param window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the reduce function to the window. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| if window_function is None: |
| internal_window_function = InternalSingleValueAllWindowFunction( |
| PassThroughAllWindowFunction()) # type: InternalWindowFunction |
| if output_type is None: |
| output_type = self.get_input_type() |
| elif isinstance(window_function, AllWindowFunction): |
| internal_window_function = InternalSingleValueAllWindowFunction(window_function) |
| elif isinstance(window_function, ProcessAllWindowFunction): |
| internal_window_function = InternalSingleValueProcessAllWindowFunction(window_function) |
| else: |
| raise TypeError("window_function should be a AllWindowFunction or " |
| "ProcessAllWindowFunction") |
| |
| reducing_state_descriptor = ReducingStateDescriptor(WINDOW_STATE_NAME, |
| reduce_function, |
| self.get_input_type()) |
| func_desc = type(reduce_function).__name__ |
| if window_function is not None: |
| func_desc = "%s, %s" % (func_desc, type(window_function).__name__) |
| |
| return self._get_result_data_stream(internal_window_function, |
| reducing_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def aggregate(self, |
| aggregate_function: AggregateFunction, |
| window_function: Union[AllWindowFunction, ProcessAllWindowFunction] = None, |
| accumulator_type: TypeInformation = None, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Arriving data is incrementally aggregated using the given aggregate function. This means |
| that the window function typically has only a single value to process when called. |
| |
| Example: |
| :: |
| |
| >>> class AverageAggregate(AggregateFunction): |
| ... def create_accumulator(self) -> Tuple[int, int]: |
| ... return 0, 0 |
| ... |
| ... def add(self, value: Tuple[str, int], accumulator: Tuple[int, int]) \\ |
| ... -> Tuple[int, int]: |
| ... return accumulator[0] + value[1], accumulator[1] + 1 |
| ... |
| ... def get_result(self, accumulator: Tuple[int, int]) -> float: |
| ... return accumulator[0] / accumulator[1] |
| ... |
| ... def merge(self, a: Tuple[int, int], b: Tuple[int, int]) -> Tuple[int, int]: |
| ... return a[0] + b[0], a[1] + b[1] |
| ... |
| >>> ds.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \\ |
| ... .aggregate(AverageAggregate(), |
| ... accumulator_type=Types.TUPLE([Types.LONG(), Types.LONG()]), |
| ... output_type=Types.DOUBLE()) |
| |
| :param aggregate_function: The aggregation function that is used for incremental |
| aggregation. |
| :param window_function: The window function. |
| :param accumulator_type: Type information for the internal accumulator type of the |
| aggregation function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| if window_function is None: |
| internal_window_function = InternalSingleValueAllWindowFunction( |
| PassThroughAllWindowFunction()) # type: InternalWindowFunction |
| elif isinstance(window_function, AllWindowFunction): |
| internal_window_function = InternalSingleValueAllWindowFunction(window_function) |
| elif isinstance(window_function, ProcessAllWindowFunction): |
| internal_window_function = InternalSingleValueProcessAllWindowFunction(window_function) |
| else: |
| raise TypeError("window_function should be a AllWindowFunction or " |
| "ProcessAllWindowFunction") |
| |
| if accumulator_type is None: |
| accumulator_type = Types.PICKLED_BYTE_ARRAY() |
| elif isinstance(accumulator_type, list): |
| accumulator_type = RowTypeInfo(accumulator_type) |
| |
| aggregating_state_descriptor = AggregatingStateDescriptor(WINDOW_STATE_NAME, |
| aggregate_function, |
| accumulator_type) |
| func_desc = type(aggregate_function).__name__ |
| if window_function is not None: |
| func_desc = "%s, %s" % (func_desc, type(window_function).__name__) |
| return self._get_result_data_stream(internal_window_function, |
| aggregating_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def apply(self, |
| window_function: AllWindowFunction, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window. The output of the window function is interpreted as a regular |
| non-windowed stream. |
| |
| Note that this function requires that all data in the windows is buffered until the window |
| is evaluated, as the function provides no means of incremental aggregation. |
| |
| :param window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| """ |
| internal_window_function = InternalIterableAllWindowFunction( |
| window_function) # type: InternalWindowFunction |
| list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type()) |
| func_desc = type(window_function).__name__ |
| return self._get_result_data_stream(internal_window_function, |
| list_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def process(self, |
| process_window_function: ProcessAllWindowFunction, |
| output_type: TypeInformation = None) -> DataStream: |
| """ |
| Applies the given window function to each window. The window function is called for each |
| evaluation of the window for each key individually. The output of the window function is |
| interpreted as a regular non-windowed stream. |
| |
| Note that this function requires that all data in the windows is buffered until the window |
| is evaluated, as the function provides no means of incremental aggregation. |
| |
| :param process_window_function: The window function. |
| :param output_type: Type information for the result type of the window function. |
| :return: The data stream that is the result of applying the window function to the window. |
| """ |
| internal_window_function = InternalIterableProcessAllWindowFunction( |
| process_window_function) # type: InternalWindowFunction |
| list_state_descriptor = ListStateDescriptor(WINDOW_STATE_NAME, self.get_input_type()) |
| func_desc = type(process_window_function).__name__ |
| return self._get_result_data_stream(internal_window_function, |
| list_state_descriptor, |
| func_desc, |
| output_type) |
| |
| def _get_result_data_stream(self, |
| internal_window_function: InternalWindowFunction, |
| window_state_descriptor: StateDescriptor, |
| func_desc: str, |
| output_type: TypeInformation): |
| if self._window_trigger is None: |
| self._window_trigger = self._window_assigner.get_default_trigger( |
| self.get_execution_environment()) |
| window_serializer = self._window_assigner.get_window_serializer() |
| window_operation_descriptor = WindowOperationDescriptor( |
| self._window_assigner, |
| self._window_trigger, |
| self._allowed_lateness, |
| self._late_data_output_tag, |
| window_state_descriptor, |
| window_serializer, |
| internal_window_function) |
| |
| from pyflink.fn_execution import flink_fn_execution_pb2 |
| j_python_data_stream_function_operator, j_output_type_info = \ |
| _get_one_input_stream_operator( |
| self._keyed_stream, |
| window_operation_descriptor, |
| flink_fn_execution_pb2.UserDefinedDataStreamFunction.WINDOW, # type: ignore |
| output_type) |
| |
| op_name = window_operation_descriptor.generate_op_name() |
| op_desc = window_operation_descriptor.generate_op_desc("AllWindow", func_desc) |
| return DataStream(self._keyed_stream._j_data_stream.transform( |
| op_name, |
| j_output_type_info, |
| j_python_data_stream_function_operator)).set_description(op_desc) |
| |
| |
| class ConnectedStreams(object): |
| """ |
| ConnectedStreams represent two connected streams of (possibly) different data types. |
| Connected streams are useful for cases where operations on one stream directly |
| affect the operations on the other stream, usually via shared state between the streams. |
| |
| An example for the use of connected streams would be to apply rules that change over time |
| onto another stream. One of the connected streams has the rules, the other stream the |
| elements to apply the rules to. The operation on the connected stream maintains the |
| current set of rules in the state. It may receive either a rule update and update the state |
| or a data element and apply the rules in the state to the element. |
| |
| The connected stream can be conceptually viewed as a union stream of an Either type, that |
| holds either the first stream's type or the second stream's type. |
| """ |
| |
| def __init__(self, stream1: DataStream, stream2: DataStream): |
| self.stream1 = stream1 |
| self.stream2 = stream2 |
| |
| def key_by(self, key_selector1: Union[Callable, KeySelector], |
| key_selector2: Union[Callable, KeySelector], |
| key_type: TypeInformation = None) -> 'ConnectedStreams': |
| """ |
| KeyBy operation for connected data stream. Assigns keys to the elements of |
| input1 and input2 using keySelector1 and keySelector2 with explicit type information |
| for the common key type. |
| |
| :param key_selector1: The `KeySelector` used for grouping the first input. |
| :param key_selector2: The `KeySelector` used for grouping the second input. |
| :param key_type: The type information of the common key type |
| :return: The partitioned `ConnectedStreams` |
| """ |
| |
| ds1 = self.stream1 |
| ds2 = self.stream2 |
| if isinstance(self.stream1, KeyedStream): |
| ds1 = self.stream1._origin_stream |
| if isinstance(self.stream2, KeyedStream): |
| ds2 = self.stream2._origin_stream |
| return ConnectedStreams( |
| ds1.key_by(key_selector1, key_type), |
| ds2.key_by(key_selector2, key_type)) |
| |
| def map(self, func: CoMapFunction, output_type: TypeInformation = None) -> 'DataStream': |
| """ |
| Applies a CoMap transformation on a `ConnectedStreams` and maps the output to a common |
| type. The transformation calls a `CoMapFunction.map1` for each element of the first |
| input and `CoMapFunction.map2` for each element of the second input. Each CoMapFunction |
| call returns exactly one element. |
| |
| :param func: The CoMapFunction used to jointly transform the two input DataStreams |
| :param output_type: `TypeInformation` for the result type of the function. |
| :return: The transformed `DataStream` |
| """ |
| if not isinstance(func, CoMapFunction): |
| raise TypeError("The input function must be a CoMapFunction!") |
| |
| if self._is_keyed_stream(): |
| class CoMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction): |
| def __init__(self, co_map_func: CoMapFunction): |
| self._open_func = co_map_func.open |
| self._close_func = co_map_func.close |
| self._map1_func = co_map_func.map1 |
| self._map2_func = co_map_func.map2 |
| |
| def open(self, runtime_context: RuntimeContext): |
| self._open_func(runtime_context) |
| |
| def close(self): |
| self._close_func() |
| |
| def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'): |
| result = self._map1_func(value) |
| if result is not None: |
| yield result |
| |
| def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'): |
| result = self._map2_func(value) |
| if result is not None: |
| yield result |
| |
| return self.process(CoMapKeyedCoProcessFunctionAdapter(func), output_type) \ |
| .name("Co-Map") |
| else: |
| class CoMapCoProcessFunctionAdapter(CoProcessFunction): |
| def __init__(self, co_map_func: CoMapFunction): |
| self._open_func = co_map_func.open |
| self._close_func = co_map_func.close |
| self._map1_func = co_map_func.map1 |
| self._map2_func = co_map_func.map2 |
| |
| def open(self, runtime_context: RuntimeContext): |
| self._open_func(runtime_context) |
| |
| def close(self): |
| self._close_func() |
| |
| def process_element1(self, value, ctx: 'CoProcessFunction.Context'): |
| result = self._map1_func(value) |
| if result is not None: |
| yield result |
| |
| def process_element2(self, value, ctx: 'CoProcessFunction.Context'): |
| result = self._map2_func(value) |
| if result is not None: |
| yield result |
| |
| return self.process(CoMapCoProcessFunctionAdapter(func), output_type) \ |
| .name("Co-Map") |
| |
| def flat_map(self, func: CoFlatMapFunction, output_type: TypeInformation = None) \ |
| -> 'DataStream': |
| """ |
| Applies a CoFlatMap transformation on a `ConnectedStreams` and maps the output to a |
| common type. The transformation calls a `CoFlatMapFunction.flatMap1` for each element |
| of the first input and `CoFlatMapFunction.flatMap2` for each element of the second |
| input. Each CoFlatMapFunction call returns any number of elements including none. |
| |
| :param func: The CoFlatMapFunction used to jointly transform the two input DataStreams |
| :param output_type: `TypeInformation` for the result type of the function. |
| :return: The transformed `DataStream` |
| """ |
| |
| if not isinstance(func, CoFlatMapFunction): |
| raise TypeError("The input must be a CoFlatMapFunction!") |
| |
| if self._is_keyed_stream(): |
| class FlatMapKeyedCoProcessFunctionAdapter(KeyedCoProcessFunction): |
| |
| def __init__(self, co_flat_map_func: CoFlatMapFunction): |
| self._open_func = co_flat_map_func.open |
| self._close_func = co_flat_map_func.close |
| self._flat_map1_func = co_flat_map_func.flat_map1 |
| self._flat_map2_func = co_flat_map_func.flat_map2 |
| |
| def open(self, runtime_context: RuntimeContext): |
| self._open_func(runtime_context) |
| |
| def close(self): |
| self._close_func() |
| |
| def process_element1(self, value, ctx: 'KeyedCoProcessFunction.Context'): |
| result = self._flat_map1_func(value) |
| if result: |
| yield from result |
| |
| def process_element2(self, value, ctx: 'KeyedCoProcessFunction.Context'): |
| result = self._flat_map2_func(value) |
| if result: |
| yield from result |
| |
| return self.process(FlatMapKeyedCoProcessFunctionAdapter(func), output_type) \ |
| .name("Co-Flat Map") |
| else: |
| class FlatMapCoProcessFunctionAdapter(CoProcessFunction): |
| |
| def __init__(self, co_flat_map_func: CoFlatMapFunction): |
| self._open_func = co_flat_map_func.open |
| self._close_func = co_flat_map_func.close |
| self._flat_map1_func = co_flat_map_func.flat_map1 |
| self._flat_map2_func = co_flat_map_func.flat_map2 |
| |
| def open(self, runtime_context: RuntimeContext): |
| self._open_func(runtime_context) |
| |
| def close(self): |
| self._close_func() |
| |
| def process_element1(self, value, ctx: 'CoProcessFunction.Context'): |
| result = self._flat_map1_func(value) |
| if result: |
| yield from result |
| |
| def process_element2(self, value, ctx: 'CoProcessFunction.Context'): |
| result = self._flat_map2_func(value) |
| if result: |
| yield from result |
| |
| return self.process(FlatMapCoProcessFunctionAdapter(func), output_type) \ |
| .name("Co-Flat Map") |
| |
| def process(self, |
| func: Union[CoProcessFunction, KeyedCoProcessFunction], |
| output_type: TypeInformation = None) -> 'DataStream': |
| if not isinstance(func, CoProcessFunction) and not isinstance(func, KeyedCoProcessFunction): |
| raise TypeError("The input must be a CoProcessFunction or KeyedCoProcessFunction!") |
| |
| from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction |
| if self._is_keyed_stream(): |
| func_type = UserDefinedDataStreamFunction.KEYED_CO_PROCESS # type: ignore |
| func_name = "Keyed Co-Process" |
| else: |
| func_type = UserDefinedDataStreamFunction.CO_PROCESS # type: ignore |
| func_name = "Co-Process" |
| |
| j_connected_stream = self.stream1._j_data_stream.connect(self.stream2._j_data_stream) |
| j_operator, j_output_type = _get_two_input_stream_operator( |
| self, |
| func, |
| func_type, |
| output_type) |
| return DataStream(j_connected_stream.transform(func_name, j_output_type, j_operator)) |
| |
| def _is_keyed_stream(self): |
| return isinstance(self.stream1, KeyedStream) and isinstance(self.stream2, KeyedStream) |
| |
| |
| class BroadcastStream(object): |
| """ |
| A BroadcastStream is a stream with :class:`state.BroadcastState` (s). This can be created by any |
| stream using the :meth:`DataStream.broadcast` method and implicitly creates states where the |
| user can store elements of the created :class:`BroadcastStream`. (see |
| :class:`BroadcastConnectedStream`). |
| |
| Note that no further operation can be applied to these streams. The only available option is |
| to connect them with a keyed or non-keyed stream, using the :meth:`KeyedStream.connect` and the |
| :meth:`DataStream.connect` respectively. Applying these methods will result it a |
| :class:`BroadcastConnectedStream` for further processing. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| |
| def __init__( |
| self, |
| input_stream: Union['DataStream', 'KeyedStream'], |
| broadcast_state_descriptors: List[MapStateDescriptor], |
| ): |
| self.input_stream = input_stream |
| self.broadcast_state_descriptors = broadcast_state_descriptors |
| |
| |
| class BroadcastConnectedStream(object): |
| """ |
| A BroadcastConnectedStream represents the result of connecting a keyed or non-keyed stream, with |
| a :class:`BroadcastStream` with :class:`~state.BroadcastState` (s). As in the case of |
| :class:`ConnectedStreams` these streams are useful for cases where operations on one stream |
| directly affect the operations on the other stream, usually via shared state between the |
| streams. |
| |
| An example for the use of such connected streams would be to apply rules that change over time |
| onto another, possibly keyed stream. The stream with the broadcast state has the rules, and will |
| store them in the broadcast state, while the other stream will contain the elements to apply the |
| rules to. By broadcasting the rules, these will be available in all parallel instances, and can |
| be applied to all partitions of the other stream. |
| |
| .. versionadded:: 1.16.0 |
| """ |
| |
| def __init__( |
| self, |
| non_broadcast_stream: Union['DataStream', 'KeyedStream'], |
| broadcast_stream: 'BroadcastStream', |
| broadcast_state_descriptors: List[MapStateDescriptor], |
| ): |
| self.non_broadcast_stream = non_broadcast_stream |
| self.broadcast_stream = broadcast_stream |
| self.broadcast_state_descriptors = broadcast_state_descriptors |
| |
| @overload |
| def process( |
| self, |
| func: BroadcastProcessFunction, |
| output_type: TypeInformation = None, |
| ) -> 'DataStream': |
| pass |
| |
| @overload |
| def process( |
| self, |
| func: KeyedBroadcastProcessFunction, |
| output_type: TypeInformation = None |
| ) -> 'DataStream': |
| pass |
| |
| def process( |
| self, |
| func: Union[BroadcastProcessFunction, KeyedBroadcastProcessFunction], |
| output_type: TypeInformation = None, |
| ) -> 'DataStream': |
| """ |
| Assumes as inputs a :class:`BroadcastStream` and a :class:`DataStream` or |
| :class:`KeyedStream` and applies the given :class:`BroadcastProcessFunction` or |
| :class:`KeyedBroadcastProcessFunction` on them, thereby creating a transformed output |
| stream. |
| |
| :param func: The :class:`BroadcastProcessFunction` that is called for each element in the |
| non-broadcasted :class:`DataStream`, or the :class:`KeyedBroadcastProcessFunction` that |
| is called for each element in the non-broadcasted :class:`KeyedStream`. |
| :param output_type: The type of the output elements, should be |
| :class:`common.TypeInformation` or list (implicit :class:`RowTypeInfo`) or None ( |
| implicit :meth:`Types.PICKLED_BYTE_ARRAY`). |
| :return: The transformed :class:`DataStream`. |
| """ |
| if isinstance(func, BroadcastProcessFunction) and self._is_keyed_stream(): |
| raise TypeError("BroadcastProcessFunction should be applied to non-keyed DataStream") |
| if isinstance(func, KeyedBroadcastProcessFunction) and (not self._is_keyed_stream()): |
| raise TypeError("KeyedBroadcastProcessFunction should be applied to keyed DataStream") |
| |
| j_input_transformation1 = self.non_broadcast_stream._j_data_stream.getTransformation() |
| j_input_transformation2 = ( |
| self.broadcast_stream.input_stream._j_data_stream.getTransformation() |
| ) |
| |
| if output_type is None: |
| output_type_info = Types.PICKLED_BYTE_ARRAY() # type: TypeInformation |
| elif isinstance(output_type, list): |
| output_type_info = RowTypeInfo(output_type) |
| elif isinstance(output_type, TypeInformation): |
| output_type_info = output_type |
| else: |
| raise TypeError("output_type must be None, list or TypeInformation") |
| j_output_type = output_type_info.get_java_type_info() |
| |
| from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction |
| jvm = get_gateway().jvm |
| JPythonConfigUtil = jvm.org.apache.flink.python.util.PythonConfigUtil |
| |
| if self._is_keyed_stream(): |
| func_type = UserDefinedDataStreamFunction.KEYED_CO_BROADCAST_PROCESS # type: ignore |
| func_name = "Keyed-Co-Process-Broadcast" |
| else: |
| func_type = UserDefinedDataStreamFunction.CO_BROADCAST_PROCESS # type: ignore |
| func_name = "Co-Process-Broadcast" |
| |
| j_state_names = to_jarray( |
| jvm.String, [i.get_name() for i in self.broadcast_state_descriptors] |
| ) |
| j_state_descriptors = JPythonConfigUtil.convertStateNamesToStateDescriptors(j_state_names) |
| j_conf = get_j_env_configuration( |
| self.broadcast_stream.input_stream._j_data_stream.getExecutionEnvironment()) |
| j_data_stream_python_function_info = _create_j_data_stream_python_function_info( |
| func, func_type |
| ) |
| j_env = ( |
| self.non_broadcast_stream.get_execution_environment()._j_stream_execution_environment |
| ) |
| |
| if self._is_keyed_stream(): |
| JTransformation = jvm.org.apache.flink.streaming.api.transformations.python \ |
| .PythonKeyedBroadcastStateTransformation |
| j_transformation = JTransformation( |
| func_name, |
| j_conf, |
| j_data_stream_python_function_info, |
| j_input_transformation1, |
| j_input_transformation2, |
| j_state_descriptors, |
| self.non_broadcast_stream._j_data_stream.getKeyType(), |
| self.non_broadcast_stream._j_data_stream.getKeySelector(), |
| j_output_type, |
| j_env.getParallelism(), |
| ) |
| else: |
| JTransformation = jvm.org.apache.flink.streaming.api.transformations.python \ |
| .PythonBroadcastStateTransformation |
| j_transformation = JTransformation( |
| func_name, |
| j_conf, |
| j_data_stream_python_function_info, |
| j_input_transformation1, |
| j_input_transformation2, |
| j_state_descriptors, |
| j_output_type, |
| j_env.getParallelism(), |
| ) |
| j_env.addOperator(j_transformation) |
| j_data_stream = JPythonConfigUtil.createSingleOutputStreamOperator(j_env, j_transformation) |
| |
| return DataStream(j_data_stream) |
| |
| def _is_keyed_stream(self): |
| return isinstance(self.non_broadcast_stream, KeyedStream) |
| |
| |
| def _get_one_input_stream_operator(data_stream: DataStream, |
| func: Union[Function, |
| FunctionWrapper, |
| WindowOperationDescriptor], |
| func_type: int, |
| output_type: Union[TypeInformation, List] = None): |
| """ |
| Create a Java one input stream operator. |
| |
| :param func: a function object that implements the Function interface. |
| :param func_type: function type, supports MAP, FLAT_MAP, etc. |
| :param output_type: the data type of the function output data. |
| :return: A Java operator which is responsible for execution user defined python function. |
| """ |
| |
| gateway = get_gateway() |
| |
| j_input_types = data_stream._j_data_stream.getTransformation().getOutputType() |
| if output_type is None: |
| output_type_info = Types.PICKLED_BYTE_ARRAY() # type: TypeInformation |
| elif isinstance(output_type, list): |
| output_type_info = RowTypeInfo(output_type) |
| else: |
| output_type_info = output_type |
| |
| j_data_stream_python_function_info = _create_j_data_stream_python_function_info(func, func_type) |
| j_output_type_info = output_type_info.get_java_type_info() |
| j_conf = get_j_env_configuration(data_stream._j_data_stream.getExecutionEnvironment()) |
| python_execution_mode = ( |
| j_conf.getString(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE)) |
| |
| from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction |
| if func_type == UserDefinedDataStreamFunction.PROCESS: # type: ignore |
| if python_execution_mode == 'thread': |
| JDataStreamPythonFunctionOperator = gateway.jvm.EmbeddedPythonProcessOperator |
| else: |
| JDataStreamPythonFunctionOperator = gateway.jvm.ExternalPythonProcessOperator |
| elif func_type == UserDefinedDataStreamFunction.KEYED_PROCESS: # type: ignore |
| if python_execution_mode == 'thread': |
| JDataStreamPythonFunctionOperator = gateway.jvm.EmbeddedPythonKeyedProcessOperator |
| else: |
| JDataStreamPythonFunctionOperator = gateway.jvm.ExternalPythonKeyedProcessOperator |
| elif func_type == UserDefinedDataStreamFunction.WINDOW: # type: ignore |
| window_serializer = typing.cast(WindowOperationDescriptor, func).window_serializer |
| if isinstance(window_serializer, TimeWindowSerializer): |
| j_namespace_serializer = \ |
| gateway.jvm.org.apache.flink.table.runtime.operators.window.TimeWindow.Serializer() |
| elif isinstance(window_serializer, CountWindowSerializer): |
| j_namespace_serializer = \ |
| gateway.jvm.org.apache.flink.table.runtime.operators.window.CountWindow.Serializer() |
| elif isinstance(window_serializer, GlobalWindowSerializer): |
| j_namespace_serializer = \ |
| gateway.jvm.org.apache.flink.streaming.api.windowing.windows.GlobalWindow \ |
| .Serializer() |
| else: |
| j_namespace_serializer = \ |
| gateway.jvm.org.apache.flink.streaming.api.utils.ByteArrayWrapperSerializer() |
| if python_execution_mode == 'thread': |
| JDataStreamPythonWindowFunctionOperator = gateway.jvm.EmbeddedPythonWindowOperator |
| else: |
| JDataStreamPythonWindowFunctionOperator = gateway.jvm.ExternalPythonKeyedProcessOperator |
| |
| j_python_function_operator = JDataStreamPythonWindowFunctionOperator( |
| j_conf, |
| j_data_stream_python_function_info, |
| j_input_types, |
| j_output_type_info, |
| j_namespace_serializer) |
| return j_python_function_operator, j_output_type_info |
| else: |
| raise TypeError("Unsupported function type: %s" % func_type) |
| |
| j_python_function_operator = JDataStreamPythonFunctionOperator( |
| j_conf, |
| j_data_stream_python_function_info, |
| j_input_types, |
| j_output_type_info) |
| |
| return j_python_function_operator, j_output_type_info |
| |
| |
| def _get_two_input_stream_operator(connected_streams: ConnectedStreams, |
| func: Union[Function, FunctionWrapper], |
| func_type: int, |
| type_info: TypeInformation): |
| """ |
| Create a Java two input stream operator. |
| |
| :param func: a function object that implements the Function interface. |
| :param func_type: function type, supports MAP, FLAT_MAP, etc. |
| :param type_info: the data type of the function output data. |
| :return: A Java operator which is responsible for execution user defined python function. |
| """ |
| gateway = get_gateway() |
| |
| j_input_types1 = connected_streams.stream1._j_data_stream.getTransformation().getOutputType() |
| j_input_types2 = connected_streams.stream2._j_data_stream.getTransformation().getOutputType() |
| |
| if type_info is None: |
| output_type_info = Types.PICKLED_BYTE_ARRAY() # type: TypeInformation |
| elif isinstance(type_info, list): |
| output_type_info = RowTypeInfo(type_info) |
| else: |
| output_type_info = type_info |
| |
| j_data_stream_python_function_info = _create_j_data_stream_python_function_info(func, func_type) |
| j_output_type_info = output_type_info.get_java_type_info() |
| j_conf = get_j_env_configuration( |
| connected_streams.stream1._j_data_stream.getExecutionEnvironment()) |
| python_execution_mode = ( |
| j_conf.getString(gateway.jvm.org.apache.flink.python.PythonOptions.PYTHON_EXECUTION_MODE)) |
| |
| from pyflink.fn_execution.flink_fn_execution_pb2 import UserDefinedDataStreamFunction |
| if func_type == UserDefinedDataStreamFunction.CO_PROCESS: # type: ignore |
| if python_execution_mode == 'thread': |
| JTwoInputPythonFunctionOperator = gateway.jvm.EmbeddedPythonCoProcessOperator |
| else: |
| JTwoInputPythonFunctionOperator = gateway.jvm.ExternalPythonCoProcessOperator |
| elif func_type == UserDefinedDataStreamFunction.KEYED_CO_PROCESS: # type: ignore |
| if python_execution_mode == 'thread': |
| JTwoInputPythonFunctionOperator = gateway.jvm.EmbeddedPythonKeyedCoProcessOperator |
| else: |
| JTwoInputPythonFunctionOperator = gateway.jvm.ExternalPythonKeyedCoProcessOperator |
| else: |
| raise TypeError("Unsupported function type: %s" % func_type) |
| |
| j_python_data_stream_function_operator = JTwoInputPythonFunctionOperator( |
| j_conf, |
| j_data_stream_python_function_info, |
| j_input_types1, |
| j_input_types2, |
| j_output_type_info) |
| |
| return j_python_data_stream_function_operator, j_output_type_info |
| |
| |
| def _create_j_data_stream_python_function_info( |
| func: Union[Function, FunctionWrapper, WindowOperationDescriptor], func_type: int |
| ) -> bytes: |
| gateway = get_gateway() |
| |
| import cloudpickle |
| serialized_func = cloudpickle.dumps(func) |
| |
| j_data_stream_python_function = gateway.jvm.DataStreamPythonFunction( |
| bytearray(serialized_func), _get_python_env() |
| ) |
| return gateway.jvm.DataStreamPythonFunctionInfo(j_data_stream_python_function, func_type) |
| |
| |
| class CloseableIterator(object): |
| """ |
| Representing an Iterator that is also auto closeable. |
| """ |
| |
| def __init__(self, j_closeable_iterator, type_info: TypeInformation = None): |
| self._j_closeable_iterator = j_closeable_iterator |
| self._type_info = type_info |
| |
| def __iter__(self): |
| return self |
| |
| def __next__(self): |
| return self.next() |
| |
| def __enter__(self): |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| self.close() |
| |
| def next(self): |
| if not self._j_closeable_iterator.hasNext(): |
| raise StopIteration('No more data.') |
| return convert_to_python_obj(self._j_closeable_iterator.next(), self._type_info) |
| |
| def close(self): |
| self._j_closeable_iterator.close() |