blob: 75fc1051165e16a95957afcfead66cf805869db0 [file] [log] [blame]
.. ################################################################################
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
################################################################################
=========
Operators
=========
Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into
sophisticated dataflow topologies.
DataStream Transformations
==========================
DataStream programs in Flink are regular programs that implement transformations on data streams (e.g., mapping,
filtering, reducing). Please see :flinkdoc:`operators <docs/dev/datastream/operators/overview/>`
for an overview of the available transformations in Python DataStream API.
Functions
=========
Transformations accept user-defined functions as input to define the functionality of the transformations.
The following section describes different ways of defining Python user-defined functions in Python DataStream API.
Implementing Function Interfaces
----------------------------------
Different Function interfaces are provided for different transformations in the Python DataStream API. For example,
``MapFunction`` is provided for the ``map`` transformation, ``FilterFunction`` is provided for the ``filter`` transformation, etc.
Users can implement the corresponding Function interface according to the type of the transformation. Take MapFunction for
instance:
.. code-block:: python
# Implementing MapFunction
class MyMapFunction(MapFunction):
def map(self, value):
return value + 1
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(MyMapFunction(), output_type=Types.INT())
Lambda Function
----------------
As shown in the following example, the transformations can also accept a lambda function to define the functionality of the transformation:
.. code-block:: python
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(lambda x: x + 1, output_type=Types.INT())
.. note::
``ConnectedStream.map()`` and ``ConnectedStream.flat_map()`` do not support
lambda function and must accept ``CoMapFunction`` and ``CoFlatMapFunction`` separately.
Python Function
----------------
Users could also use Python function to define the functionality of the transformation:
.. code-block:: python
def my_map_func(value):
return value + 1
data_stream = env.from_collection([1, 2, 3, 4, 5], type_info=Types.INT())
mapped_stream = data_stream.map(my_map_func, output_type=Types.INT())
Output Type
===========
Users could specify the output type information of the transformation explicitly in Python DataStream API. If not
specified, the output type will be ``Types.PICKLED_BYTE_ARRAY`` by default, and the result data will be serialized using pickle serializer.
For more details about the pickle serializer, please refer to :ref:`pickle-serialization` in the :doc:`data_types` page.
Generally, the output type needs to be specified in the following scenarios.
Convert DataStream into Table
------------------------------
.. code-block:: python
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
def data_stream_api_demo():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(stream_execution_environment=env)
t_env.execute_sql("""
CREATE TABLE my_source (
a INT,
b VARCHAR
) WITH (
'connector' = 'datagen',
'number-of-rows' = '10'
)
""")
ds = t_env.to_append_stream(
t_env.from_path('my_source'),
Types.ROW([Types.INT(), Types.STRING()]))
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds = ds.map(lambda i: (i[0] + 1, i[1])) \
.flat_map(split, Types.TUPLE([Types.INT(), Types.STRING()])) \
.key_by(lambda i: i[1]) \
.reduce(lambda i, j: (i[0] + j[0], i[1]))
t_env.execute_sql("""
CREATE TABLE my_sink (
a INT,
b VARCHAR
) WITH (
'connector' = 'print'
)
""")
table = t_env.from_data_stream(ds)
table_result = table.execute_insert("my_sink")
# 1) wait for job finishes and only used in local execution, otherwise, it may happen that the script exits with the job is still running
# 2) should be removed when submitting the job to a remote cluster such as YARN, standalone, K8s etc in detach mode
table_result.wait()
if __name__ == '__main__':
data_stream_api_demo()
The output type must be specified for the flat_map operation in the above example which will be used as
the output type of the reduce operation implicitly. The reason is that
``t_env.from_data_stream(ds)`` requires the output type of ``ds`` must be a composite type.
Write DataStream to Sink
-------------------------
.. code-block:: python
from pyflink.common.typeinfo import Types
def split(s):
splits = s[1].split("|")
for sp in splits:
yield s[0], sp
ds.map(lambda i: (i[0] + 1, i[1]), Types.TUPLE([Types.INT(), Types.STRING()])) \
.sink_to(...)
Generally, the output type needs to be specified for the map operation in the above example if the sink only accepts special kinds of data, e.g. Row, etc.
Operator Chaining
=================
By default, multiple non-shuffle Python functions will be chained together to avoid the serialization and
deserialization and improve the performance. There are also cases where you may want to disable
the chaining, e.g., there is a ``flatmap`` function which will produce a large number of elements for
each input element and disabling the chaining allows to process its output in a different parallelism.
Operator chaining could be disabled in one of the following ways:
- Disable chaining with following operators by adding a
:flinkdoc:`key_by <docs/dev/datastream/operators/overview/#keyby>` operation,
:flinkdoc:`shuffle <docs/dev/datastream/operators/overview/#random-partitioning>` operation,
:flinkdoc:`rescale <docs/dev/datastream/operators/overview/#rescaling>` operation,
:flinkdoc:`rebalance <docs/dev/datastream/operators/overview/#rescaling>` operation or
:flinkdoc:`partition_custom <docs/dev/datastream/operators/overview/#custom-partitioning>` operation
after the current operator.
- Disable chaining with preceding operators by applying a
:flinkdoc:`start_new_chain <docs/dev/datastream/operators/overview/#start-new-chain>` operation for the current operator.
- Disable chaining with preceding and following operators by applying a
:flinkdoc:`disable_chaining <docs/dev/datastream/operators/overview/#disable-chaining>` operation for the current operator.
- Disable chaining of two operators by setting different parallelisms or different
:flinkdoc:`slot sharing group <docs/dev/datastream/operators/overview/#set-slot-sharing-group>` for them.
- You could also disable all the operator chaining via configuration
``python.operator-chaining.enabled`` (see :doc:`../configuration`).
Bundling Python Functions
==========================
To run Python functions in any non-local mode, it is strongly recommended
bundling your Python functions definitions using the config option ``python-files`` (see :doc:`../configuration`),
if your Python functions live outside the file where the ``main()`` function is defined.
Otherwise, you may run into ``ModuleNotFoundError: No module named 'my_function'``
if you define Python functions in a file called ``my_function.py``.
Loading resources in Python Functions
======================================
There are scenarios when you want to load some resources in Python functions first,
then running computation over and over again, without having to re-load the resources.
For example, you may want to load a large deep learning model only once,
then run batch prediction against the model multiple times.
Overriding the ``open`` method inherited from the base class ``Function`` is exactly what you need.
.. code-block:: python
class Predict(MapFunction):
def open(self, runtime_context: RuntimeContext):
import pickle
with open("resources.zip/resources/model.pkl", "rb") as f:
self.model = pickle.load(f)
def eval(self, x):
return self.model.predict(x)