blob: c691cd2957d33179f7ddc35771d374379c853288 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language. The functionality becomes more and more mature through the development in the past releases.
Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the Apache Beam Portability Framework). It will bring additional serialization/deserialization overhead and also communication overhead.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Exploring the thread mode in PyFlink" />
<meta property="og:description" content="PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language. The functionality becomes more and more mature through the development in the past releases.
Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the Apache Beam Portability Framework). It will bring additional serialization/deserialization overhead and also communication overhead." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2022/05/06/exploring-the-thread-mode-in-pyflink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2022-05-06T12:00:00+00:00" />
<meta property="article:modified_time" content="2022-05-06T12:00:00+00:00" />
<title>Exploring the thread mode in PyFlink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2022/05/06/exploring-the-thread-mode-in-pyflink/">Exploring the thread mode in PyFlink</a>
</h1>
May 6, 2022 -
Xingbo Huang
Dian Fu
<p><p>PyFlink was introduced in Flink 1.9 which purpose is to bring the power of Flink to Python users and allow Python users to develop Flink jobs in Python language.
The functionality becomes more and more mature through the development in the past releases.</p>
<p>Before Flink 1.15, Python user-defined functions will be executed in separate Python processes (based on the <a href="https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70">Apache Beam Portability Framework</a>).
It will bring additional serialization/deserialization overhead and also communication overhead. In scenarios where the data size is big, e.g. image processing, etc,
this overhead becomes non-negligible. Besides, since it involves inter-process communication, the processing latency is also non-negligible,
which is unacceptable in scenarios where the latency is critical, e.g. quantitative transaction, etc.</p>
<p>In Flink 1.15, we have introduced a new execution mode named &rsquo;thread&rsquo; mode (based on <a href="https://github.com/alibaba/pemja">PEMJA</a>) where the Python user-defined functions will be executed in the JVM as
a thread instead of a separate Python process. In this article, we will dig into the details about this execution mode and also share some benchmark data to
give users a basic understanding of how it works and which scenarios it’s applicable for.</p>
<h2 id="process-mode">
Process Mode
<a class="anchor" href="#process-mode">#</a>
</h2>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-architecture-overview.png"/>
<br/>
<i><small>Fig. 1 - PyFlink Architecture Overview</small></i>
</center>
<br/>
<p>From Fig. 1, we can see the architecture of PyFlink. As shown on the left side of Fig.1, users could use PyFlink API(Python Table API &amp; SQL or Python DataStream API) to declare the logic of jobs,
which will be finally translated into JobGraph (DAG of the job) which could be recognized by Flink’s execution framework. It should be noted that Python operators (Flink operators whose purpose is to
execute Python user-defined functions) will be used to execute the Python user-defined functions.</p>
<p>On the right side of Fig. 1, it shows the details of the Python operators where the Python user-defined functions were executed in separate Python processes.</p>
<p>In order to communicate with the Python worker process, a series of communication services are required between the Python operator(runs in JVM) and the Python worker(runs in Python VM).
PyFlink has employed <a href="https://docs.google.com/document/d/1B9NmaBSKCnMJQp-ibkxvZ_U233Su67c1eYgBhrqWP24/edit#heading=h.khjybycus70">Apache Beam Portability framework</a> to execute Python user-defined functions which provides the basic building blocks required for PyFlink.</p>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-process-mode.png"/>
<br/>
<i><small>Fig. 2 - PyFlink Runtime in Process Mode</small></i>
</center>
<br/>
<p>Process mode can be executed stably and efficiently in most scenarios. It is enough for more users. However, in some scenarios, it doesn’t work well due to the additional serialization/deserialization overhead.
One of the most typical scenarios is image processing, where the input data size is often very big. Besides, since it involves inter-process communication, the processing latency is also non-negligible
which is unacceptable in scenarios where latency is critical, e.g. quantitative transaction, etc. In order to overcome these problems, we have introduced a new execution mode(thread mode)
where Python user-defined functions will be executed in the JVM as a thread instead of a separate Python process. In the following section, we will dig into the details of this new execution mode.</p>
<h2 id="pemja">
PEMJA
<a class="anchor" href="#pemja">#</a>
</h2>
<p>Before digging into the thread mode, let’s introduce a library <a href="https://github.com/alibaba/pemja">PEMJA</a> firstly, which is the core to the architecture of thread mode.</p>
<p>As we all know, Java Native Interface (JNI) is a standard programming interface for writing Java native methods and embedding the Java virtual machine into native applications.
What’s more, CPython provides Python/C API to help embed Python in C Applications.</p>
<p>So if we combine these two interfaces together, we can embed Python in Java Application. Since this library solves a general problem that Python and Java could call each other,
we have open sourced it as an independent project, and PyFlink has depended on <a href="https://github.com/alibaba/pemja">PEMJA</a> since Flink 1.15 to support thread mode.</p>
<h3 id="pemja-architecture">
PEMJA Architecture
<a class="anchor" href="#pemja-architecture">#</a>
</h3>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pemja.png"/>
<br/>
<i><small>Fig. 3 - PEMJA Architecture</small></i>
</center>
<br/>
<p>As we can see from the architecture of <a href="https://github.com/alibaba/pemja">PEMJA</a> in Fig. 3, JVM and PVM can call each other in the same process through <a href="https://github.com/alibaba/pemja">PEMJA</a> Library.</p>
<p>Firstly, <a href="https://github.com/alibaba/pemja">PEMJA</a> will start a daemon thread in JVM, which is responsible for initializing the Python Environment and creating a Python Main Interpreter owned by this process.
The reason why <a href="https://github.com/alibaba/pemja">PEMJA</a> uses a dedicated thread to initialize Python Environment is to avoid potential deadlocks in Python Interpreter.
Python Interpreter could deadlock when trying to acquire the GIL through methods such as <a href="https://docs.python.org/3/c-api/init.html#c.PyGILState_Ensure">PyGILState_*</a> in Python/C API concurrently.
It should be noted that <a href="https://github.com/alibaba/pemja">PEMJA</a> doesn’t call those methods directly, however, it may happen that third-party libraries may call them, e.g. <a href="https://numpy.org/">numpy</a>, etc.
To get around this, we use a dedicated thread to initialize the Python Environment.</p>
<p>Then, each Java worker thread can invoke the Python functions through the Python <a href="https://docs.python.org/3/c-api/init.html">ThreadState</a> created from Python Main Interpreter.</p>
<h3 id="comparison-with-other-solutions">
Comparison with other solutions
<a class="anchor" href="#comparison-with-other-solutions">#</a>
</h3>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Framework</th>
<th style="text-align: center">Principle</th>
<th style="text-align: center">Limitations</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center"><a href="https://www.jython.org/">Jython</a></td>
<td style="text-align: center">Python compiler implemented in Java</td>
<td style="text-align: justify">
<ul>
<li>Only support for Python2</li>
</ul>
</td>
</tr>
<tr>
<td style="text-align: center"><a href="https://www.graalvm.org/python/">GraalVM</a></td>
<td style="text-align: center">Truffle framework</td>
<td style="text-align: justify">
<ul>
<li>Compatibility issues with various Python ecological libraries</li>
<li>Works only with GraalVM</li>
</ul>
</td>
</tr>
<tr>
<td style="text-align: center"><a href="https://github.com/jpype-project/jpype">JPype</a></td>
<td style="text-align: center">JNI + Python/C API</td>
<td style="text-align: justify">
<ul>
<li>Don’t support Java calling Python</li>
<li>Only support for CPython</li>
</ul>
</td>
</tr>
<tr>
<td style="text-align: center"><a href="https://github.com/ninia/jep">Jep</a></td>
<td style="text-align: center">JNI + Python/C API</td>
<td style="text-align: justify">
<ul>
<li>Difficult to integrate</li>
<li>Performance is not good enough</li>
<li>Only support for CPython</li>
</ul>
</td>
</tr>
<tr>
<td style="text-align: center"><a href="https://github.com/alibaba/pemja">PEMJA</a></td>
<td style="text-align: center">JNI + Python/C API</td>
<td style="text-align: justify">
<ul>
<li>Only support for CPython</li>
</ul>
</td>
</tr>
</tbody>
</table>
<p>In the table above, we have made a basic comparison of the popular solutions of Java/Python calling libraries.</p>
<p><a href="https://www.jython.org/">Jython</a>: Jython is a Python interpreter implemented in Java language. Because its implementation language is Java,
the interoperability between code implemented by Python syntax and Java code will be very natural.
However, Jython does not support Python 3 anymore, and it is no longer actively maintained.</p>
<p><a href="https://www.graalvm.org/python/">GraalVM</a>: GraalVM takes use of Truffle framework to support interoperability between Python and Java.
However, it has the limitation that not all the Python libraries are supported. As we know, many Python libraries rely on standard CPython to implement their C extensions.
The other problem is that it only works with GraalVM, which means high migration costs.</p>
<p><a href="https://github.com/jpype-project/jpype">JPype</a>: Similar to <a href="https://github.com/alibaba/pemja">PEMJA</a>,
JPype is also a framework built using JNI and Python/C API, but JPype only supports calling Java from Python.</p>
<p><a href="https://github.com/ninia/jep">Jep</a>: Similar to <a href="https://github.com/alibaba/pemja">PEMJA</a>, Jep is also a framework built using JNI and Python/C API and it supports calling Python from Java.
However, it doesn’t provide a jar to the maven repository and the process of loading native packages needs to be specified in advance through jvm parameters or environment variables when the JVM starts,
which makes it difficult to integrate. Furthermore, our benchmark shows that the performance is not very good.</p>
<p><a href="https://github.com/alibaba/pemja">PEMJA</a>: Similar to Jep and JPype, PEMJA is built on CPython, so it cannot support other Python interpreters, such as PyPy, etc.
Since CPython is the most used implementation and standard of Python Runtime officially provided by Python, most libraries of the Python ecology are built on CPython Runtime and so could work with PEMJA naturally.</p>
<h2 id="thread-mode">
Thread Mode
<a class="anchor" href="#thread-mode">#</a>
</h2>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-thread-mode.png"/>
<br/>
<i><small>Fig. 4 - PyFlink Runtime in Thread Mode</small></i>
</center>
<br/>
<p>From the picture above, we can see that in thread mode, the Python user-defined function runs in the same process as the Python operator(which runs in JVM).
<a href="https://github.com/alibaba/pemja">PEMJA</a> is used as a bridge between the Java code and the Python code.</p>
<p>Since the Python user-defined function runs in JVM, for each input data received from the upstream operators, it will be passed to
the Python user-defined function directly instead of buffered and passed to the Python user-defined function in a batch.
Therefore, thread mode could have lower latency compared to the process mode. Currently, if users want to achieve lower latency in process mode, usually they need to configure the
<code>python.fn-execution.bundle.size</code> or <code>python.fn-execution.bundle.time</code> to a lower value. However, since it involves inter-process communication,
the latency is still a little high in some scenarios. However, this is not a problem any more in thread mode. Besides, configuring <code>python.fn-execution.bundle.size</code> or <code>python.fn-execution.bundle.time</code> to
a lower value usually will affect the overall performance of the job and this will also not be a problem in thread mode.</p>
<h2 id="comparisons-between-process-mode-and-thread-mode">
Comparisons between process mode and thread mode
<a class="anchor" href="#comparisons-between-process-mode-and-thread-mode">#</a>
</h2>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Execution Mode</th>
<th style="text-align: center">Benefits</th>
<th style="text-align: center">Limitations</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">Process Mode</td>
<td style="text-align: justify">
<ul>
<li>Better resource isolation</li>
</ul>
</td>
<td style="text-align: justify">
<ul>
<li>IPC overhead</li>
<li>High implementation complexity</li>
</ul>
</td>
</tr>
<tr>
<td style="text-align: center">Thread Mode</td>
<td style="text-align: justify">
<ul>
<li>Higher throughput</li>
<li>Lower latency</li>
<li>Less checkpoint time</li>
<li>Less usage restrictions</li>
</ul>
</td>
<td style="text-align: justify">
<ul>
<li>Only support for CPython</li>
<li>Multiple jobs cannot use different Python interpreters in session mode</li>
<li>Performance is affected by the GIL</li>
</ul>
</td>
</tr>
</tbody>
</table>
<h3 id="benefits-of-thread-mode">
Benefits of thread mode
<a class="anchor" href="#benefits-of-thread-mode">#</a>
</h3>
<p>Since it processes data in batches in process mode, currently Python user-defined functions could not be used in some scenarios,
e.g. used in the Join(Table API &amp; SQL) condition and taking columns both from the left table and the right table as inputs.
However, this will not be a big problem any more in thread mode because of the nature that it handles the data one by one instead of a batch.</p>
<p>Unlike process mode which sends and receives data asynchronously in batches, in thread mode, data will be processed synchronously one by one.
So usually it will have lower latency and also less checkpoint time. In terms of performance, since there is no inter-process communication,
it could avoid data serialization/deserialization and communication overhead, as well as the stage of copying and context switching between kernel space and user space,
so it usually will have better performance in thread mode.</p>
<h3 id="limitations">
Limitations
<a class="anchor" href="#limitations">#</a>
</h3>
<p>However, there are also some limitations for thread mode:</p>
<ul>
<li>It only supports CPython which is also one of the most used Python interpreters.</li>
<li>It doesn’t support session mode well and so it’s recommended that users only use thread mode in per-job or application deployments.
The reason is it doesn’t support using different Python interpreters for the jobs running in the same TaskManager.
This limitation comes from the fact that many Python libraries assume that they will only be initialized once in the process, so they use a lot of static variables.</li>
</ul>
<h2 id="usage">
Usage
<a class="anchor" href="#usage">#</a>
</h2>
<p>The execution mode could be configured via the configuration <code>python.execution-mode</code>. It has two possible values:</p>
<ul>
<li><code>process</code>: The Python user-defined functions will be executed in a separate Python process. (default)</li>
<li><code>thread</code>: The Python user-defined functions will be executed in the same process as Java operators.</li>
</ul>
<p>For example, you could configure it as following in Python Table API:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># Specify `process` mode</span>
</span></span><span class="line"><span class="cl"><span class="n">table_env</span><span class="o">.</span><span class="n">get_config</span><span class="p">()</span><span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="s2">&#34;python.execution-mode&#34;</span><span class="p">,</span> <span class="s2">&#34;process&#34;</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># Specify `thread` mode</span>
</span></span><span class="line"><span class="cl"><span class="n">table_env</span><span class="o">.</span><span class="n">get_config</span><span class="p">()</span><span class="o">.</span><span class="n">set</span><span class="p">(</span><span class="s2">&#34;python.execution-mode&#34;</span><span class="p">,</span> <span class="s2">&#34;thread&#34;</span><span class="p">)</span>
</span></span></code></pre></div><p>It should be noted that since this is still the first release of &rsquo;thread&rsquo; mode, currently there are still many limitations about it,
e.g. it only supports Python ScalarFunction of Python Table API &amp; SQL. It will fall back to &lsquo;process&rsquo; mode where &rsquo;thread&rsquo; mode is not supported.
So it may happen that you configure a job to execute in thread mode, however, it’s actually executed in &lsquo;process&rsquo; execution mode.</p>
<h2 id="benchmarkhttpsgithubcomhuangxingbopyflink-benchmark">
<a href="https://github.com/HuangXingBo/pyflink-benchmark">Benchmark</a>
<a class="anchor" href="#benchmarkhttpsgithubcomhuangxingbopyflink-benchmark">#</a>
</h2>
<h3 id="test-environment">
Test environment
<a class="anchor" href="#test-environment">#</a>
</h3>
<p>OS: Alibaba Cloud Linux (Aliyun Linux) release 2.1903 LTS (Hunting Beagle)</p>
<p>CPU: Intel(R) Xeon(R) Platinum 8269CY CPU @ 2.50GHz</p>
<p>Memory: 16G</p>
<p>CPython: Python 3.7.3</p>
<p>JDK: OpenJDK Runtime Environment (build 1.8.0_292-b10)</p>
<p>PyFlink: 1.15.0</p>
<h3 id="test-results">
Test results
<a class="anchor" href="#test-results">#</a>
</h3>
<p>Here, we test the json processing which is a very common scenario for PyFlink users.</p>
<p>The UDF implementation is as following:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># python udf</span>
</span></span><span class="line"><span class="cl"><span class="nd">@udf</span><span class="p">(</span><span class="n">result_type</span><span class="o">=</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">STRING</span><span class="p">(),</span> <span class="n">func_type</span><span class="o">=</span><span class="s2">&#34;general&#34;</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">json_value_lower</span><span class="p">(</span><span class="n">s</span><span class="p">:</span> <span class="nb">str</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="kn">import</span> <span class="nn">json</span>
</span></span><span class="line"><span class="cl"> <span class="n">a</span> <span class="o">=</span> <span class="n">json</span><span class="o">.</span><span class="n">loads</span><span class="p">(</span><span class="n">s</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"> <span class="n">a</span><span class="p">[</span><span class="s1">&#39;a&#39;</span><span class="p">]</span> <span class="o">=</span> <span class="n">a</span><span class="p">[</span><span class="s1">&#39;a&#39;</span><span class="p">]</span><span class="o">.</span><span class="n">lower</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">json</span><span class="o">.</span><span class="n">dumps</span><span class="p">(</span><span class="n">a</span><span class="p">)</span>
</span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// Java UDF</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">JsonValueLower</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">ScalarFunction</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">ObjectMapper</span><span class="w"> </span><span class="n">mapper</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">ObjectWriter</span><span class="w"> </span><span class="n">writer</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">open</span><span class="p">(</span><span class="n">FunctionContext</span><span class="w"> </span><span class="n">context</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">mapper</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ObjectMapper</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">writer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">mapper</span><span class="p">.</span><span class="na">writerWithDefaultPrettyPrinter</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">eval</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">s</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">StringObject</span><span class="w"> </span><span class="n">object</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">mapper</span><span class="p">.</span><span class="na">readValue</span><span class="p">(</span><span class="n">s</span><span class="p">,</span><span class="w"> </span><span class="n">StringObject</span><span class="p">.</span><span class="na">class</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">object</span><span class="p">.</span><span class="na">setA</span><span class="p">(</span><span class="n">object</span><span class="p">.</span><span class="na">a</span><span class="p">.</span><span class="na">toLowerCase</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">writer</span><span class="p">.</span><span class="na">writeValueAsString</span><span class="p">(</span><span class="n">object</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">JsonProcessingException</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">throw</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">RuntimeException</span><span class="p">(</span><span class="s">&#34;Failed to read json value&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">e</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">StringObject</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">a</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">getA</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">a</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">setA</span><span class="p">(</span><span class="n">String</span><span class="w"> </span><span class="n">a</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">a</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">a</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">toString</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="s">&#34;StringObject{&#34;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="s">&#34;a=&#39;&#34;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">a</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="sc">&#39;\&#39;&#39;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="sc">&#39;}&#39;</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>The test results is as following:</p>
<table width="95%" border="1">
<thead>
<tr>
<th style="text-align: center">Type (input data size)</th>
<th style="text-align: center">QPS</th>
<th style="text-align: center">Latency</th>
<th style="text-align: center">Checkpoint Time</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">Java UDF (100k)</td>
<td style="text-align: center">900</td>
<td style="text-align: center">2ms</td>
<td style="text-align: center">100ms</td>
</tr>
<tr>
<td style="text-align: center">Java UDF (10k)</td>
<td style="text-align: center">1w</td>
<td style="text-align: center">20us</td>
<td style="text-align: center">10ms</td>
</tr>
<tr>
<td style="text-align: center">Java UDF (1k)</td>
<td style="text-align: center">5w</td>
<td style="text-align: center">1us</td>
<td style="text-align: center">10ms</td>
</tr>
<tr>
<td style="text-align: center">Java UDF (100)</td>
<td style="text-align: center">28w</td>
<td style="text-align: center">200ns</td>
<td style="text-align: center">10ms</td>
</tr>
<tr>
<td style="text-align: center">Process Mode (100k)</td>
<td style="text-align: center">900</td>
<td style="text-align: center">5s-10s</td>
<td style="text-align: center">5s</td>
</tr>
<tr>
<td style="text-align: center">Process Mode (10k)</td>
<td style="text-align: center">7000</td>
<td style="text-align: center">5s-10s</td>
<td style="text-align: center">3s</td>
</tr>
<tr>
<td style="text-align: center">Process Mode (1k)</td>
<td style="text-align: center">3.6w</td>
<td style="text-align: center">3s</td>
<td style="text-align: center">3s</td>
</tr>
<tr>
<td style="text-align: center">Process Mode (100)</td>
<td style="text-align: center">12w</td>
<td style="text-align: center">2s</td>
<td style="text-align: center">2s</td>
</tr>
<tr>
<td style="text-align: center">Thread Mode (100k)</td>
<td style="text-align: center">1200</td>
<td style="text-align: center">1ms</td>
<td style="text-align: center">100ms</td>
</tr>
<tr>
<td style="text-align: center">Thread Mode (10k)</td>
<td style="text-align: center">1.2w</td>
<td style="text-align: center">20us</td>
<td style="text-align: center">10ms</td>
</tr>
<tr>
<td style="text-align: center">Thread Mode (1k)</td>
<td style="text-align: center">5w</td>
<td style="text-align: center">3us</td>
<td style="text-align: center">10ms</td>
</tr>
<tr>
<td style="text-align: center">Thread Mode (100)</td>
<td style="text-align: center">12w</td>
<td style="text-align: center">1us</td>
<td style="text-align: center">10ms</td>
</tr>
</tbody>
</table>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-performance.jpg"/>
</center>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-latency.jpg"/>
</center>
<center>
<img src="/img/blog/2022-05-06-pyflink-1.15-thread-mode/pyflink-checkpoint-time.jpg"/>
</center>
<p>As we can see from the test results:</p>
<ul>
<li>
<p>If you care about latency and checkpoint time, thread mode is your better choice. The processing latency could be decreased from several seconds in process mode to microseconds in thread mode.</p>
</li>
<li>
<p>Thread mode can bring better performance than process mode when data serialization/deserialization is not negligible relative to UDF calculation itself.
Compared to process mode, benchmark has shown that the throughput could be increased by 2x in common scenarios such as json processing in thread mode.
​​However, if the UDF calculation is slow and spends much longer time, then it is more recommended to use process mode, because the process mode is more mature and it has better resource isolation.</p>
</li>
<li>
<p>When the performance of Python UDF is close to that of Java UDF, the end-to-end performance of thread mode will be close to that of Java UDF.</p>
</li>
</ul>
<h2 id="summary--future-work">
Summary &amp; Future work
<a class="anchor" href="#summary--future-work">#</a>
</h2>
<p>In this article, we have introduced the &rsquo;thread&rsquo; execution mode in PyFlink which is a new feature introduced in Flink 1.15.
Compared with the &lsquo;process&rsquo; execution mode, users will get better performance, lower latency, less checkpoint time in &rsquo;thread&rsquo; mode.
However, there are also some limitations about &rsquo;thread&rsquo; mode, e.g. poor support for session deployment mode, etc.</p>
<p>It should be noted that since this is still the first release of &rsquo;thread&rsquo; mode, currently there are still many limitations about it,
e.g. it only supports Python ScalarFunction of Python Table API &amp; SQL. We&rsquo;re planning to extend it to other places where Python user-defined functions could be used in next releases.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2022-05-06-pyflink-1.15-thread-mode.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#process-mode">Process Mode</a></li>
<li><a href="#pemja">PEMJA</a>
<ul>
<li><a href="#pemja-architecture">PEMJA Architecture</a></li>
<li><a href="#comparison-with-other-solutions">Comparison with other solutions</a></li>
</ul>
</li>
<li><a href="#thread-mode">Thread Mode</a></li>
<li><a href="#comparisons-between-process-mode-and-thread-mode">Comparisons between process mode and thread mode</a>
<ul>
<li><a href="#benefits-of-thread-mode">Benefits of thread mode</a></li>
<li><a href="#limitations">Limitations</a></li>
</ul>
</li>
<li><a href="#usage">Usage</a></li>
<li><a href="#benchmarkhttpsgithubcomhuangxingbopyflink-benchmark"><a href="https://github.com/HuangXingBo/pyflink-benchmark">Benchmark</a></a>
<ul>
<li><a href="#test-environment">Test environment</a></li>
<li><a href="#test-results">Test results</a></li>
</ul>
</li>
<li><a href="#summary--future-work">Summary &amp; Future work</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>