blob: 65cc80148c6cf946ec2b09d6253aa775c6ec9977 [file] [log] [blame]
.. ################################################################################
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
################################################################################
=====================
Dependency Management
=====================
There are requirements to use dependencies inside the Python API programs. For example, users
may need to use third-party Python libraries in Python user-defined functions.
In addition, in scenarios such as machine learning prediction, users may want to load a machine
learning model inside the Python user-defined functions.
When the PyFlink job is executed locally, users could install the third-party Python libraries into
the local Python environment, download the machine learning model to local, etc.
However, this approach doesn't work well when users want to submit the PyFlink jobs to remote clusters.
In the following sections, we will introduce the options provided in PyFlink for these requirements.
.. note::
Both Python DataStream API and Python Table API have provided
APIs for each kind of dependency. If you are mixing use of Python DataStream API and Python Table API
in a single job, you should specify the dependencies via Python DataStream API to make them work for
both the Python DataStream API and Python Table API.
JAR Dependencies
================
If third-party JARs are used, you can specify the JARs in the Python Table API as following:
.. code-block:: python
# Specify a list of jar URLs via "pipeline.jars". The jars are separated by ";"
# and will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
table_env.get_config().set("pipeline.jars", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
# It looks like the following on Windows:
table_env.get_config().set("pipeline.jars", "file:///E:/my/jar/path/connector.jar;file:///E:/my/jar/path/udf.jar")
# Specify a list of URLs via "pipeline.classpaths". The URLs are separated by ";"
# and will be added to the classpath during job execution.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the URLs are accessible on both the client and the cluster.
table_env.get_config().set("pipeline.classpaths", "file:///my/jar/path/connector.jar;file:///my/jar/path/udf.jar")
or in the Python DataStream API as following:
.. code-block:: python
# Use the add_jars() to add local jars and the jars will be uploaded to the cluster.
# NOTE: Only local file URLs (start with "file://") are supported.
stream_execution_environment.add_jars("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
# It looks like the following on Windows:
stream_execution_environment.add_jars("file:///E:/my/jar/path/connector1.jar", "file:///E:/my/jar/path/connector2.jar")
# Use the add_classpaths() to add the dependent jars URLs into the classpath.
# The URLs will also be added to the classpath of both the client and the cluster.
# NOTE: The paths must specify a protocol (e.g. file://) and users should ensure that the
# URLs are accessible on both the client and the cluster.
stream_execution_environment.add_classpaths("file:///my/jar/path/connector1.jar", "file:///my/jar/path/connector2.jar")
or through the :flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>` ``--jarfile`` when submitting the job.
.. note::
It only supports to specify one jar file with the command
line argument ``--jarfile`` and so you need to build a fat jar if there are multiple jar files.
Python Dependencies
===================
Python libraries
----------------
You may want to use third-part Python libraries in Python user-defined functions.
There are multiple ways to specify the Python libraries.
You could specify them inside the code using Python Table API as following:
.. code-block:: python
table_env.add_python_file(file_path)
or using Python DataStream API as following:
.. code-block:: python
stream_execution_environment.add_python_file(file_path)
You could also specify the Python libraries using configuration
``python.files`` (see :doc:`configuration`)
or via :flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>` ``-pyfs`` or ``--pyFiles``
when submitting the job.
.. note::
The Python libraries could be local files or
local directories. They will be added to the PYTHONPATH of the Python UDF worker.
requirements.txt
----------------
It also allows to specify a ``requirements.txt`` file which defines the third-party Python dependencies.
These Python dependencies will be installed into the working directory and added to the PYTHONPATH of
the Python UDF worker.
You could prepare the ``requirements.txt`` manually as following:
.. code-block:: shell
echo numpy==1.16.5 >> requirements.txt
echo pandas==1.0.0 >> requirements.txt
or using ``pip freeze`` which lists all the packages installed in the current Python environment:
.. code-block:: shell
pip freeze > requirements.txt
The content of the requirements.txt file may look like the following:
.. code-block:: text
numpy==1.16.5
pandas==1.0.0
You could manually edit it by removing unnecessary entries or adding extra entries, etc.
The ``requirements.txt`` file could then be specified inside the code using Python Table API as following:
.. code-block:: python
# requirements_cache_dir is optional
table_env.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
or using Python DataStream API as following:
.. code-block:: python
# requirements_cache_dir is optional
stream_execution_environment.set_python_requirements(
requirements_file_path="/path/to/requirements.txt",
requirements_cache_dir="cached_dir")
.. note::
For the dependencies which could not be accessed in
the cluster, a directory which contains the installation packages of these dependencies could be
specified using the parameter ``requirements_cached_dir``. It will be uploaded to the cluster to
support offline installation. You could prepare the ``requirements_cache_dir`` as following:
.. code-block:: shell
pip download -d cached_dir -r requirements.txt --no-binary :all:
.. note::
Please make sure that the prepared packages match
the platform of the cluster, and the Python version used.
You could also specify the ``requirements.txt`` file using configuration
``python.requirements`` (see :doc:`configuration`)
or via :flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>`
``-pyreq`` or ``--pyRequirements`` when submitting the job.
.. note::
It will install the packages specified in the
``requirements.txt`` file using pip, so please make sure that pip (version >= 20.3)
and setuptools (version >= 37.0.0) are available.
Archives
--------
You may also want to specify archive files. The archive files could be used to specify custom
Python virtual environments, data files, etc.
You could specify the archive files inside the code using Python Table API as following:
.. code-block:: python
table_env.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
or using Python DataStream API as following:
.. code-block:: python
stream_execution_environment.add_python_archive(archive_path="/path/to/archive_file", target_dir=None)
.. note::
The parameter ``target_dir`` is optional. If specified,
the archive file will be extracted to a directory with the specified name of ``target_dir`` during execution.
Otherwise, the archive file will be extracted to a directory with the same name as the archive file.
Suppose you have specified the archive file as following:
.. code-block:: python
table_env.add_python_archive("/path/to/py_env.zip", "myenv")
Then, you could access the content of the archive file in Python user-defined functions as following:
.. code-block:: python
def my_udf():
with open("myenv/py_env/data/data.txt") as f:
...
If you have not specified the parameter ``target_dir``:
.. code-block:: python
table_env.add_python_archive("/path/to/py_env.zip")
You could then access the content of the archive file in Python user-defined functions as following:
.. code-block:: python
def my_udf():
with open("py_env.zip/py_env/data/data.txt") as f:
...
.. note::
The archive file will be extracted to the working
directory of Python UDF worker and so you could access the files inside the archive file using
relative path.
You could also specify the archive files using configuration
``python.archives`` (see :doc:`configuration`)
or via :flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>`
``-pyarch`` or ``--pyArchives`` when submitting the job.
.. note::
If the archive file contains a Python virtual environment,
please make sure that the Python virtual environment matches the platform that the cluster is running on.
.. note::
Currently, only zip files (i.e., zip, jar, whl, egg, etc)
and tar files (i.e., tar, tar.gz, tgz) are supported.
Python interpreter
------------------
It supports to specify the path of the Python interpreter to execute Python worker.
You could specify the Python interpreter inside the code using Python Table API as following:
.. code-block:: python
table_env.get_config().set_python_executable("/path/to/python")
or using Python DataStream API as following:
.. code-block:: python
stream_execution_environment.set_python_executable("/path/to/python")
It also supports to use the Python interpreter inside an archive file.
.. code-block:: python
# Python Table API
table_env.add_python_archive("/path/to/py_env.zip", "venv")
table_env.get_config().set_python_executable("venv/py_env/bin/python")
# Python DataStream API
stream_execution_environment.add_python_archive("/path/to/py_env.zip", "venv")
stream_execution_environment.set_python_executable("venv/py_env/bin/python")
You could also specify the Python interpreter using configuration
``python.executable`` (see :doc:`configuration`)
or via :flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>`
``-pyexec`` or ``--pyExecutable`` when submitting the job.
.. note::
If the path of the Python interpreter refers to the
Python archive file, relative path should be used instead of absolute path.
Python interpreter of client
----------------------------
Python is needed at the client side to parse the Python user-defined functions during
compiling the job.
You could specify the custom Python interpreter used at the client side by activating
it in the current session.
.. code-block:: shell
source my_env/bin/activate
or specify it using configuration
``python.client.executable`` (see :doc:`configuration`),
:flinkdoc:`command line arguments <docs/deployment/cli/#submitting-pyflink-jobs>` ``-pyclientexec`` or ``--pyClientExecutable``,
environment variable ``PYFLINK_CLIENT_EXECUTABLE`` (see :doc:`environment_variables`).
How to specify Python Dependencies in Java/Scala Program
=========================================================
It also supports to use Python user-defined functions in the Java Table API programs or pure SQL programs.
The following code shows a simple example on how to use the Python user-defined functions in a
Java Table API program:
.. code-block:: java
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
TableEnvironment tEnv = TableEnvironment.create(
EnvironmentSettings.inBatchMode());
tEnv.getConfig().set(CoreOptions.DEFAULT_PARALLELISM, 1);
// register the Python UDF
tEnv.executeSql("create temporary system function add_one as 'add_one.add_one' language python");
tEnv.createTemporaryView("source", tEnv.fromValues(1L, 2L, 3L).as("a"));
// use Python UDF in the Java Table API program
tEnv.executeSql("select add_one(a) as a from source").collect();
You can refer to the SQL statement about :flinkdoc:`CREATE FUNCTION <docs/sql/reference/ddl/create/#create-function>`
for more details on how to create Python user-defined functions using SQL statements.
The Python dependencies could then be specified via the Python config options (see :doc:`configuration`),
such as **python.archives**, **python.files**, **python.requirements**, **python.client.executable**,
**python.executable**, etc or through :flinkdoc:`command line arguments <docs/deployment/cli/#usage>`
when submitting the job.