blob: 8a9e4195661d3a78eaffba64e07ae4a4af995184 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=>
<head>
<meta name="generator" content="Hugo 0.111.3">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Flink 1.9 introduced the Python Table API, allowing developers and data engineers to write Python Table API jobs for Table transformations and analysis, such as Python ETL or aggregate jobs. However, Python users faced some limitations when it came to support for Python UDFs in Flink 1.9, preventing them from extending the system’s built-in functionality.
In Flink 1.10, the community further extended the support for Python by adding Python UDFs in PyFlink.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="PyFlink: Introducing Python Support for UDFs in Flink&#39;s Table API" />
<meta property="og:description" content="Flink 1.9 introduced the Python Table API, allowing developers and data engineers to write Python Table API jobs for Table transformations and analysis, such as Python ETL or aggregate jobs. However, Python users faced some limitations when it came to support for Python UDFs in Flink 1.9, preventing them from extending the system’s built-in functionality.
In Flink 1.10, the community further extended the support for Python by adding Python UDFs in PyFlink." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/04/09/pyflink-introducing-python-support-for-udfs-in-flinks-table-api/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-04-09T12:00:00+00:00" />
<meta property="article:modified_time" content="2020-04-09T12:00:00+00:00" />
<title>PyFlink: Introducing Python Support for UDFs in Flink&#39;s Table API | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.e3b33391dbc1f4b2cc47778e2f4b86c744ded3ccc82fdfb6f08caf91d8607f9a.css" integrity="sha256-47MzkdvB9LLMR3eOL0uGx0Te08zIL9&#43;28Iyvkdhgf5o=">
<script defer src="/en.search.min.8592fd2e43835d2ef6fab8eb9b8969ee6ad1bdb888a636e37e28032f8bd9887d.js" integrity="sha256-hZL9LkODXS72&#43;rjrm4lp7mrRvbiIpjbjfigDL4vZiH0="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/04/09/pyflink-introducing-python-support-for-udfs-in-flinks-table-api/">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=>
<input type="checkbox" class="hidden toggle" id="menu-control" />
<input type="checkbox" class="hidden toggle" id="toc-control" />
<main class="container flex">
<aside class="book-menu">
<nav>
<a id="logo" href="/">
<img width="70%" src="/flink-header-logo.svg">
</a>
<div class="book-search">
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/" />
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
<input type="checkbox" id="section-4117fb24454a2c30ee86e524839e77ec" class="toggle" />
<label for="section-4117fb24454a2c30ee86e524839e77ec" class="flex justify-between flink-menu-item">What is Apache Flink?<span></span>
</label>
<ul>
<li>
<label for="section-ffd5922da551e96e0481423fab94c463" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-architecture/" class="">Architecture</a>
</label>
</li>
<li>
<label for="section-fc28f08b67476edb77e00e03b6c7c2e0" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-applications/" class="">Applications</a>
</label>
</li>
<li>
<label for="section-612df33a02d5d4ee78d718abaab5b5b4" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-operations/" class="">Operations</a>
</label>
</li>
</ul>
<label for="section-f1ecec07350bd6810050d40158878749" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">What is Stateful Functions? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-4113a4c3072cb35f6fd7a0d4e098ee70" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">What is Flink ML? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-b39c70259d0abbe2bf1d8d645425f84d" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">What is the Flink Kubernetes Operator? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-53e0b1afcb9ccaf779dc285aa272a014" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">What is Flink Table Store? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-f4973f06a66f063045b4ebdacaf3127d" class="flex justify-between flink-menu-item">
<a href="/use-cases/" class="">Use Cases</a>
</label>
<label for="section-0f1863835376e859ac438ae9529daff2" class="flex justify-between flink-menu-item">
<a href="/powered-by/" class="">Powered By</a>
</label>
<br/>
<label for="section-f383f23a96a43d8d0cc66aeb0237e26a" class="flex justify-between flink-menu-item">
<a href="/downloads/" class="">Downloads</a>
</label>
<input type="checkbox" id="section-c727fab97b4d77e5b28ce8c448fb9000" class="toggle" />
<label for="section-c727fab97b4d77e5b28ce8c448fb9000" class="flex justify-between flink-menu-item">Getting Started<span></span>
</label>
<ul>
<li>
<label for="section-f45abaa99ab076108b9a5b94edbc6647" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/" style="color:black" class="">With Flink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-efe2166e9dce6f72e126dcc2396b4402" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html" style="color:black" class="">With Flink Stateful Functions <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-7e268d0a469b1093bb33d71d093eb7b9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/" style="color:black" class="">With Flink ML <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-cc7147cd0441503127bfaf6f219d4fbb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/" style="color:black" class="">With Flink Kubernetes Operator <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-660ca694e416d8ca9176dda52a60d637" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/docs/try-table-store/quick-start/" style="color:black" class="">With Flink Table Store <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-75db0b47bf4ae9c247aadbba5fbd720d" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/" style="color:black" class="">Training Course <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<input type="checkbox" id="section-6318075fef29529089951a49d413d083" class="toggle" />
<label for="section-6318075fef29529089951a49d413d083" class="flex justify-between flink-menu-item">Documentation<span></span>
</label>
<ul>
<li>
<label for="section-9a8122d8912450484d1c25394ad40229" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/" style="color:black" class="">Flink 1.17 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-8b2fd3efb702be3783ba98d650707e3c" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-master/" style="color:black" class="">Flink Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-5317a079cddb964c59763c27607f43d9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">Stateful Functions 3.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-25b72f108b7156e94d91b04853d8813a" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-master" style="color:black" class="">Stateful Functions Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-13a02f969904a2455a39ed90e287593f" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">ML 2.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-6d895ec5ad127a29a6a9ce101328ccdf" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-master" style="color:black" class="">ML Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-c83ad0caf34e364bf3729badd233a350" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">Kubernetes Operator 1.4 (latest) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-a2c75d90005425982ba8f26ae0e160a3" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main" style="color:black" class="">Kubernetes Operator Main (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-07b85e4b2f61b1526bf202c64460abcd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">Table Store 0.3 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-9b9a0032b1e858a34c125d828d1a0718" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-master/" style="color:black" class="">Table Store Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<label for="section-63d6a565d79aa2895f70806a46021c07" class="flex justify-between flink-menu-item">
<a href="/getting-help/" class="">Getting Help</a>
</label>
<label for="section-1d5066022b83f4732dc80f4e9eaa069a" class="flex justify-between flink-menu-item">
<a href="https://flink-packages.org/" style="color:black" class="">flink-packages.org <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<br/>
<label for="section-7821b78a97db9e919426e86121a7be9c" class="flex justify-between flink-menu-item">
<a href="/community/" class="">Community & Project Info</a>
</label>
<label for="section-8c042831df4e371c4ef9375f1df06f35" class="flex justify-between flink-menu-item">
<a href="/roadmap/" class="">Roadmap</a>
</label>
<input type="checkbox" id="section-73117efde5302fddcb193307d582b588" class="toggle" />
<label for="section-73117efde5302fddcb193307d582b588" class="flex justify-between flink-menu-item">How to Contribute<span></span>
</label>
<ul>
<li>
<label for="section-6646b26b23a3e79b8de9c552ee76f6dd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/overview/" class="">Overview</a>
</label>
</li>
<li>
<label for="section-e6ab9538b82cd5f94103b971adb7c1a9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-code/" class="">Contribute Code</a>
</label>
</li>
<li>
<label for="section-1c09e1358485e82d9b3f5f689d4ced65" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/reviewing-prs/" class="">Review Pull Requests</a>
</label>
</li>
<li>
<label for="section-ed01e0defd235498fa3c9a2a0b3302fb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/code-style-and-quality-preamble/" class="">Code Style and Quality Guide</a>
</label>
</li>
<li>
<label for="section-4e8d5e9924cf15f397711b0d82e15650" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-documentation/" class="">Contribute Documentation</a>
</label>
</li>
<li>
<label for="section-ddaa8307917e5ba7f60ba3316711e492" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/documentation-style-guide/" class="">Documentation Style Guide</a>
</label>
</li>
<li>
<label for="section-390a72c171cc82f180a308b95fc3aa72" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/improve-website/" class="">Contribute to the Website</a>
</label>
</li>
</ul>
<label for="section-9d3ddfd487223d5a199ba301f25c88c6" class="flex justify-between flink-menu-item">
<a href="/security/" class="">Security</a>
</label>
<br/>
<label for="section-a07783f405300745807d39eacf150420" class="flex justify-between flink-menu-item">
<a href="/posts/" class="">Flink Blog</a>
</label>
<br/>
<hr class="menu-break">
<label for="section-f71a7070dbb7b669824a6441408ded70" class="flex justify-between flink-menu-item">
<a href="https://github.com/apache/flink" style="color:black" class="">Flink on GitHub <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-2ccaaab8c67f3105bbf7df75faca8027" class="flex justify-between flink-menu-item">
<a href="https://twitter.com/apacheflink" style="color:black" class="">@ApacheFlink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<hr class="menu-break">
<table>
<tr>
<th colspan="2">
<label for="section-78c2028200542d78f8c1a8f6b4cbb36b" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/" style="color:black" class="">Apache Software Foundation <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></th>
</tr>
<tr>
<td>
<label for="section-794df3791a8c800841516007427a2aa3" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/licenses/" style="color:black" class="">License <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-2fae32629d4ef4fc6341f1751b405e45" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/security/" style="color:black" class="">Security <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
<tr>
<td>
<label for="section-0584e445d656b83b431227bb80ff0c30" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/sponsorship.html" style="color:black" class="">Donate <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-00d06796e489999226fb5bb27fe1b3b2" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/thanks.html" style="color:black" class="">Thanks <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
</table>
<hr class="menu-break">
<a href="/zh/" class="flex align-center">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;&nbsp;
中文版
</a>
<script src="/js/track-search-terms.js"></script>
</nav>
<script>(function(){var e=document.querySelector("aside.book-menu nav");addEventListener("beforeunload",function(){localStorage.setItem("menu.scrollTop",e.scrollTop)}),e.scrollTop=localStorage.getItem("menu.scrollTop")})()</script>
</aside>
<div class="book-page">
<header class="book-header">
<div class="flex align-center justify-between">
<label for="menu-control">
<img src="/svg/menu.svg" class="book-icon" alt="Menu" />
</label>
<strong>PyFlink: Introducing Python Support for UDFs in Flink's Table API</strong>
<label for="toc-control">
<img src="/svg/toc.svg" class="book-icon" alt="Table of Contents" />
</label>
</div>
<aside class="hidden clearfix">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3>
<ul>
<li><a href="#install-pyflink">Install PyFlink</a></li>
<li><a href="#define-a-python-udf">Define a Python UDF</a></li>
<li><a href="#register-a-python-udf">Register a Python UDF</a></li>
<li><a href="#invoke-a-python-udf">Invoke a Python UDF</a></li>
<li><a href="#submit-the-job">Submit the job</a></li>
<li><a href="#python-udf-dependency-management">Python UDF dependency management</a></li>
</ul>
</nav>
</aside>
</header>
<article class="markdown">
<h1>
<a href="/2020/04/09/pyflink-introducing-python-support-for-udfs-in-flinks-table-api/">PyFlink: Introducing Python Support for UDFs in Flink&#39;s Table API</a>
</h1>
April 9, 2020 -
Jincheng Sun
<a href="https://twitter.com/sunjincheng121">(@sunjincheng121)</a>
Markos Sfikas
<a href="https://twitter.com/MarkSfik">(@MarkSfik)</a>
<p><p>Flink 1.9 introduced the Python Table API, allowing developers and data engineers to write Python Table API jobs for Table transformations and analysis, such as Python ETL or aggregate jobs. However, Python users faced some limitations when it came to support for Python UDFs in Flink 1.9, preventing them from extending the system’s built-in functionality.</p>
<p>In Flink 1.10, the community further extended the support for Python by adding Python UDFs in PyFlink. Additionally, both the Python UDF environment and dependency management are now supported, allowing users to import third-party libraries in the UDFs, leveraging Python&rsquo;s rich set of third-party libraries.</p>
<h1 id="python-support-for-udfs-in-flink-110">
Python Support for UDFs in Flink 1.10
<a class="anchor" href="#python-support-for-udfs-in-flink-110">#</a>
</h1>
<p>Before diving into how you can define and use Python UDFs, we explain the motivation and background behind how UDFs work in PyFlink and provide some additional context about the implementation of our approach. Below we give a brief introduction on the PyFlink architecture from job submission, all the way to executing the Python UDF.</p>
<p>The PyFlink architecture mainly includes two parts — local and cluster — as shown in the architecture visual below. The local phase is the compilation of the job, and the cluster is the execution of the job.</p>
<center>
<img src="/img/blog/2020-04-09-pyflink-udfs/pyflink-udf-architecture.png" width="600px" alt="PyFlink UDF Architecture"/>
</center>
<br>
<p>For the local part, the Python API is a mapping of the Java API: each time Python executes a method in the figure above, it will synchronously call the method corresponding to Java through Py4J, and finally generate a Java JobGraph, before submitting it to the cluster.</p>
<p>For the cluster part, just like ordinary Java jobs, the JobMaster schedules tasks to TaskManagers. The tasks that include Python UDF in a TaskManager involve the execution of Java and Python operators. In the Python UDF operator, various gRPC services are used to provide different communications between the Java VM and the Python VM, such as DataService for data transmissions, StateService for state requirements, and Logging and Metrics Services. These services are built on Beam&rsquo;s Fn API. While currently only Process mode is supported for Python workers, support for Docker mode and External service mode is also considered for future Flink releases.</p>
<h1 id="how-to-use-pyflink-with-udfs-in-flink-110">
How to use PyFlink with UDFs in Flink 1.10
<a class="anchor" href="#how-to-use-pyflink-with-udfs-in-flink-110">#</a>
</h1>
<p>This section provides some Python user defined function (UDF) examples, including how to install PyFlink, how to define/register/invoke UDFs in PyFlink and how to execute the job.</p>
<h2 id="install-pyflink">
Install PyFlink
<a class="anchor" href="#install-pyflink">#</a>
</h2>
<p>Using Python in Apache Flink requires installing PyFlink. PyFlink is available through PyPI and can be easily installed using pip:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-bash" data-lang="bash"><span class="line"><span class="cl">$ python -m pip install apache-flink
</span></span></code></pre></div><div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
Please note that Python 3.5 or higher is required to install and run PyFlink
</div>
<br>
<h2 id="define-a-python-udf">
Define a Python UDF
<a class="anchor" href="#define-a-python-udf">#</a>
</h2>
<p>There are many ways to define a Python scalar function, besides extending the base class <code>ScalarFunction</code>. The following example shows the different ways of defining a Python scalar function that takes two columns of <code>BIGINT</code> as input parameters and returns the sum of them as the result.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># option 1: extending the base class `ScalarFunction`</span>
</span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">Add</span><span class="p">(</span><span class="n">ScalarFunction</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="nf">eval</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">add</span> <span class="o">=</span> <span class="n">udf</span><span class="p">(</span><span class="n">Add</span><span class="p">(),</span> <span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># option 2: Python function</span>
</span></span><span class="line"><span class="cl"><span class="nd">@udf</span><span class="p">(</span><span class="n">input_types</span><span class="o">=</span><span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">result_type</span><span class="o">=</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># option 3: lambda function</span>
</span></span><span class="line"><span class="cl"><span class="n">add</span> <span class="o">=</span> <span class="n">udf</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">:</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span><span class="p">,</span> <span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># option 4: callable function</span>
</span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">CallableAdd</span><span class="p">(</span><span class="nb">object</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">def</span> <span class="fm">__call__</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">add</span> <span class="o">=</span> <span class="n">udf</span><span class="p">(</span><span class="n">CallableAdd</span><span class="p">(),</span> <span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="c1"># option 5: partial function</span>
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">partial_add</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">,</span> <span class="n">k</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span> <span class="o">+</span> <span class="n">k</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">add</span> <span class="o">=</span> <span class="n">udf</span><span class="p">(</span><span class="n">functools</span><span class="o">.</span><span class="n">partial</span><span class="p">(</span><span class="n">partial_add</span><span class="p">,</span> <span class="n">k</span><span class="o">=</span><span class="mi">1</span><span class="p">),</span> <span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span>
</span></span><span class="line"><span class="cl"> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span></code></pre></div><h2 id="register-a-python-udf">
Register a Python UDF
<a class="anchor" href="#register-a-python-udf">#</a>
</h2>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># register the Python function</span>
</span></span><span class="line"><span class="cl"><span class="n">table_env</span><span class="o">.</span><span class="n">register_function</span><span class="p">(</span><span class="s2">&#34;add&#34;</span><span class="p">,</span> <span class="n">add</span><span class="p">)</span>
</span></span></code></pre></div><h2 id="invoke-a-python-udf">
Invoke a Python UDF
<a class="anchor" href="#invoke-a-python-udf">#</a>
</h2>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="c1"># use the function in Python Table API</span>
</span></span><span class="line"><span class="cl"><span class="n">my_table</span><span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">&#34;add(a, b)&#34;</span><span class="p">)</span>
</span></span></code></pre></div><p>Below, you can find a complete example of using Python UDF.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.datastream</span> <span class="kn">import</span> <span class="n">StreamExecutionEnvironment</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.table</span> <span class="kn">import</span> <span class="n">StreamTableEnvironment</span><span class="p">,</span> <span class="n">DataTypes</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.table.descriptors</span> <span class="kn">import</span> <span class="n">Schema</span><span class="p">,</span> <span class="n">OldCsv</span><span class="p">,</span> <span class="n">FileSystem</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.table.udf</span> <span class="kn">import</span> <span class="n">udf</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">get_execution_environment</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"><span class="n">env</span><span class="o">.</span><span class="n">set_parallelism</span><span class="p">(</span><span class="mi">1</span><span class="p">)</span>
</span></span><span class="line"><span class="cl"><span class="n">t_env</span> <span class="o">=</span> <span class="n">StreamTableEnvironment</span><span class="o">.</span><span class="n">create</span><span class="p">(</span><span class="n">env</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">add</span> <span class="o">=</span> <span class="n">udf</span><span class="p">(</span><span class="k">lambda</span> <span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">:</span> <span class="n">i</span> <span class="o">+</span> <span class="n">j</span><span class="p">,</span> <span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">register_function</span><span class="p">(</span><span class="s2">&#34;add&#34;</span><span class="p">,</span> <span class="n">add</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="n">FileSystem</span><span class="p">()</span><span class="o">.</span><span class="n">path</span><span class="p">(</span><span class="s1">&#39;/tmp/input&#39;</span><span class="p">))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">with_format</span><span class="p">(</span><span class="n">OldCsv</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;a&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;b&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">with_schema</span><span class="p">(</span><span class="n">Schema</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;a&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;b&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">create_temporary_table</span><span class="p">(</span><span class="s1">&#39;mySource&#39;</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">connect</span><span class="p">(</span><span class="n">FileSystem</span><span class="p">()</span><span class="o">.</span><span class="n">path</span><span class="p">(</span><span class="s1">&#39;/tmp/output&#39;</span><span class="p">))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">with_format</span><span class="p">(</span><span class="n">OldCsv</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;sum&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">with_schema</span><span class="p">(</span><span class="n">Schema</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">field</span><span class="p">(</span><span class="s1">&#39;sum&#39;</span><span class="p">,</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()))</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">create_temporary_table</span><span class="p">(</span><span class="s1">&#39;mySink&#39;</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">from_path</span><span class="p">(</span><span class="s1">&#39;mySource&#39;</span><span class="p">)</span>\
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">select</span><span class="p">(</span><span class="s2">&#34;add(a, b)&#34;</span><span class="p">)</span> \
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="n">insert_into</span><span class="p">(</span><span class="s1">&#39;mySink&#39;</span><span class="p">)</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">&#34;tutorial_job&#34;</span><span class="p">)</span>
</span></span></code></pre></div><h2 id="submit-the-job">
Submit the job
<a class="anchor" href="#submit-the-job">#</a>
</h2>
<p>Firstly, you need to prepare the input data in the “/tmp/input” file. For example,</p>
<p><code>$ echo &quot;1,2&quot; &gt; /tmp/input</code></p>
<p>Next, you can run this example on the command line,</p>
<p><code>$ python python_udf_sum.py</code></p>
<p>The command builds and runs the Python Table API program in a local mini-cluster. You can also submit the Python Table API program to a remote cluster using different command lines, (see more details <a href="//nightlies.apache.org/flinkflink-docs-release-1.10/ops/cli.html#job-submission-examples">here</a>).</p>
<p>Finally, you can see the execution result on the command line:</p>
<p><code>$ cat /tmp/output 3</code></p>
<h2 id="python-udf-dependency-management">
Python UDF dependency management
<a class="anchor" href="#python-udf-dependency-management">#</a>
</h2>
<p>In many cases, you would like to import third-party dependencies in the Python UDF. The example below provides detailed guidance on how to manage such dependencies.</p>
<p>Suppose you want to use the <code>mpmath</code> to perform the sum of the example above. The Python UDF may look like:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="nd">@udf</span><span class="p">(</span><span class="n">input_types</span><span class="o">=</span><span class="p">[</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">(),</span> <span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">()],</span> <span class="n">result_type</span><span class="o">=</span><span class="n">DataTypes</span><span class="o">.</span><span class="n">BIGINT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"><span class="k">def</span> <span class="nf">add</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">):</span>
</span></span><span class="line"><span class="cl"> <span class="kn">from</span> <span class="nn">mpmath</span> <span class="kn">import</span> <span class="n">fadd</span> <span class="c1"># add third-party dependency</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="nb">int</span><span class="p">(</span><span class="n">fadd</span><span class="p">(</span><span class="n">i</span><span class="p">,</span> <span class="n">j</span><span class="p">))</span>
</span></span></code></pre></div><p>To make it available on the worker node that does not contain the dependency, you can specify the dependencies with the following commands and API:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-bash" data-lang="bash"><span class="line"><span class="cl">$ <span class="nb">cd</span> /tmp
</span></span><span class="line"><span class="cl">$ <span class="nb">echo</span> <span class="nv">mpmath</span><span class="o">==</span>1.1.0 &gt; requirements.txt
</span></span><span class="line"><span class="cl">$ pip download -d cached_dir -r requirements.txt --no-binary :all:
</span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="n">t_env</span><span class="o">.</span><span class="n">set_python_requirements</span><span class="p">(</span><span class="s2">&#34;/tmp/requirements.txt&#34;</span><span class="p">,</span> <span class="s2">&#34;/tmp/cached_dir&#34;</span><span class="p">)</span>
</span></span></code></pre></div><p>A <code>requirements.txt</code> file that defines the third-party dependencies is used. If the dependencies cannot be accessed in the cluster, then you can specify a directory containing the installation packages of these dependencies by using the parameter &ldquo;<code>requirements_cached_dir</code>&rdquo;, as illustrated in the example above. The dependencies will be uploaded to the cluster and installed offline.</p>
<h1 id="conclusion--upcoming-work">
Conclusion &amp; Upcoming work
<a class="anchor" href="#conclusion--upcoming-work">#</a>
</h1>
<p>In this blog post, we introduced the architecture of Python UDFs in PyFlink and provided some examples on how to define, register and invoke UDFs. Flink 1.10 brings Python support in the framework to new levels, allowing Python users to write even more magic with their preferred language. The community is actively working towards continuously improving the functionality and performance of PyFlink. Future work in upcoming releases will introduce support for Pandas UDFs in scalar and aggregate functions, add support to use Python UDFs through the SQL client to further expand the usage scope of Python UDFs, provide support for a Python ML Pipeline API and finally work towards even more performance improvements. The picture below provides more details on the roadmap for succeeding releases.</p>
<center>
<img src="/img/blog/2020-04-09-pyflink-udfs/roadmap-of-pyflink.png" width="600px" alt="Roadmap of PyFlink"/>
</center>
<br>
</p>
</article>
<footer class="book-footer">
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
<br><br>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-04-09-pyflink-udf-support-flink.md" style="color:black"><i class="fa fa-edit fa-fw"></i>Edit This Page</a>
</footer>
<div class="book-comments">
</div>
<label for="menu-control" class="hidden book-menu-overlay"></label>
</div>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3>
<ul>
<li><a href="#install-pyflink">Install PyFlink</a></li>
<li><a href="#define-a-python-udf">Define a Python UDF</a></li>
<li><a href="#register-a-python-udf">Register a Python UDF</a></li>
<li><a href="#invoke-a-python-udf">Invoke a Python UDF</a></li>
<li><a href="#submit-the-job">Submit the job</a></li>
<li><a href="#python-udf-dependency-management">Python UDF dependency management</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc">
<button class="toc" onclick="expandToc()">
<i class="fa fa-expand" aria-hidden="true"></i>
</button>
</aside>
</main>
</body>
</html>