blob: 3f2a9448c589c77f1fcb97411947db1d59d19487 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="The Apache Flink PMC is proud to announce the release of Apache Flink 2.1.0. This marks a significant milestone in the evolution of the real-time data processing engine into a unified Data &#43; AI platform. This release brings together 116 global contributors, implements 16 FLIPs (Flink Improvement Proposals), and resolves over 220 issues, with a strong focus on deepening the integration of real-time AI and intelligent stream processing:
Breakthroughs in Real-Time AI:">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Apache Flink 2.1.0: Ushers in a New Era of Unified Real-Time Data &#43; AI with Comprehensive Upgrades" />
<meta property="og:description" content="The Apache Flink PMC is proud to announce the release of Apache Flink 2.1.0. This marks a significant milestone in the evolution of the real-time data processing engine into a unified Data &#43; AI platform. This release brings together 116 global contributors, implements 16 FLIPs (Flink Improvement Proposals), and resolves over 220 issues, with a strong focus on deepening the integration of real-time AI and intelligent stream processing:
Breakthroughs in Real-Time AI:" />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2025-07-31T00:00:00+00:00" />
<meta property="article:modified_time" content="2025-07-31T00:00:00+00:00" />
<title>Apache Flink 2.1.0: Ushers in a New Era of Unified Real-Time Data &#43; AI with Comprehensive Upgrades | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.b58d961779f91cae8414117efac138dcbed605c935bfb22393047cf18fc734bd.js" integrity="sha256-tY2WF3n5HK6EFBF&#43;&#43;sE43L7WBck1v7IjkwR88Y/HNL0="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 2.1 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-lts/">Flink 1.20 (LTS)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.12 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.5 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2025/07/31/apache-flink-2.1.0-ushers-in-a-new-era-of-unified-real-time-data--ai-with-comprehensive-upgrades/">Apache Flink 2.1.0: Ushers in a New Era of Unified Real-Time Data &#43; AI with Comprehensive Upgrades</a>
</h1>
July 31, 2025 -
Ron Liu
<a href="https://twitter.com/Ron999">(@Ron999)</a>
<p><p>The Apache Flink PMC is proud to announce the release of Apache Flink 2.1.0. This marks a significant milestone
in the evolution of the real-time data processing engine into a unified Data + AI platform. This release brings
together 116 global contributors, implements 16 FLIPs (Flink Improvement Proposals), and resolves over 220 issues,
with a strong focus on deepening the integration of real-time AI and intelligent stream processing:</p>
<ol>
<li>
<p><strong>Breakthroughs in Real-Time AI</strong>:</p>
<ul>
<li>
<p>Introduces AI Model DDL, enabling flexible management of AI models through Flink SQL and the Table API.</p>
</li>
<li>
<p>Extends the <code>ML_PREDICT</code> Table-Valued Function (TVF), empowering real-time invocation of AI models within Flink SQL,
laying the foundation for building end-to-end real-time AI workflows.</p>
</li>
</ul>
</li>
<li>
<p><strong>Enhanced Real-Time Data Processing</strong>:</p>
<ul>
<li>
<p>Process Table Functions (PTFs) open up the Flink SQL engine for more event-driven application.
Giving access to Flink’s managed state, event-time and timer services, and underlying table changelogs.</p>
</li>
<li>
<p>Adds the <code>VARIANT</code> data type for efficient handling of semi-structured data like JSON. Combined with the <code>PARSE_JSON</code> function
and lakehouse formats (e.g., Apache Paimon), it enables dynamic schema data analysis.</p>
</li>
<li>
<p>Significantly optimizes streaming joins with the innovative introduction of <code>DeltaJoin</code> and <code>MultiJoin</code> strategies,
eliminating state bottlenecks and improving resource utilization and job stability.</p>
</li>
</ul>
</li>
</ol>
<p>Flink 2.1.0 seamlessly integrates real-time data processing with AI models, empowering enterprises to advance from
real-time analytics to real-time intelligent decision-making, meeting the evolving demands of modern data applications.
We extend our gratitude to all contributors for their invaluable support!</p>
<p>Let&rsquo;s dive into the highlights.</p>
<h1 id="flink-sql-improvements">
Flink SQL Improvements
<a class="anchor" href="#flink-sql-improvements">#</a>
</h1>
<h2 id="model-ddls">
Model DDLs
<a class="anchor" href="#model-ddls">#</a>
</h2>
<p>Since Flink 2.0, we have introduced dedicated syntax for AI models, enabling users to define models
as easily as creating catalog objects and invoke them like standard functions or table functions in SQL statements.
In Flink 2.1, we have also added Model DDLs Table API support, enabling users to define and manage AI models programmatically
via the Table API in both Java and Python. This provides a flexible, code-driven alternative to SQL for model management and
integration within Flink applications.</p>
<p>Example:</p>
<ul>
<li>Defining a Model via Flink SQL</li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="n">my_model</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">INPUT</span><span class="w"> </span><span class="p">(</span><span class="n">f0</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">OUTPUT</span><span class="w"> </span><span class="p">(</span><span class="n">label</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;task&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;classification&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;type&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;remote&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;provider&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;openai&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;openai.endpoint&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;remote&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;openai.api_key&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;abcdefg&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>Defining a Model via Table API (Java)</li>
</ul>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">tEnv</span><span class="p">.</span><span class="na">createModel</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s">&#34;MyModel&#34;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ModelDescriptor</span><span class="p">.</span><span class="na">forProvider</span><span class="p">(</span><span class="s">&#34;OPENAI&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">inputSchema</span><span class="p">(</span><span class="n">Schema</span><span class="p">.</span><span class="na">newBuilder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">column</span><span class="p">(</span><span class="s">&#34;f0&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">DataTypes</span><span class="p">.</span><span class="na">STRING</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">outputSchema</span><span class="p">(</span><span class="n">Schema</span><span class="p">.</span><span class="na">newBuilder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">column</span><span class="p">(</span><span class="s">&#34;label&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">DataTypes</span><span class="p">.</span><span class="na">STRING</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">option</span><span class="p">(</span><span class="s">&#34;task&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;classification&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">option</span><span class="p">(</span><span class="s">&#34;type&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;remote&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">option</span><span class="p">(</span><span class="s">&#34;provider&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;openai&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">option</span><span class="p">(</span><span class="s">&#34;openai.endpoint&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;remote&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">option</span><span class="p">(</span><span class="s">&#34;openai.api_key&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;abcdefg&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kc">true</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37548">FLINK-37548</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A&#43;Support&#43;ML&#43;Models&#43;in&#43;Flink&#43;SQL">FLIP-437</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-507%3A&#43;Add&#43;Model&#43;DDL&#43;methods&#43;in&#43;TABLE&#43;API">FLIP-507</a></li>
</ul>
<h2 id="realtime-ai-function">
Realtime AI Function
<a class="anchor" href="#realtime-ai-function">#</a>
</h2>
<p>Based on the AI model DDL, In Flink 2.1, we expanded the <code>ML_PREDICT</code> table-valued function (TVF) to perform realtime model inference in SQL queries, applying machine learning models to data streams seamlessly.
The implementation supports both Flink builtin model providers (OpenAI) and interfaces for users to define custom model providers, accelerating Flink&rsquo;s evolution from a real-time
data processing engine to a unified realtime AI platform. Looking ahead, we plan to introduce more AI functions such as <code>ML_EVALUATE</code>, <code>VECTOR_SEARCH</code> to unlock end-to-end experience
for real-time data processing, model training, and inference.</p>
<p>Take the following SQL statements as an example:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="c1">-- Declare a AI model
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">CREATE</span><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="o">`</span><span class="n">my_model</span><span class="o">`</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">INPUT</span><span class="w"> </span><span class="p">(</span><span class="nb">text</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">OUTPUT</span><span class="w"> </span><span class="p">(</span><span class="n">response</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">WITH</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;provider&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;openai&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;endpoint&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;https://api.openai.com/v1/llm/v1/chat&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;api-key&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;abcdefg&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;system-prompt&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;translate to Chinese&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;model&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;gpt-4o&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Basic usage
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">ML_PREDICT</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">input_table</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="n">my_model</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">DESCRIPTOR</span><span class="p">(</span><span class="nb">text</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- With configuration options
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">ML_PREDICT</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">input_table</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="n">my_model</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">DESCRIPTOR</span><span class="p">(</span><span class="nb">text</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">MAP</span><span class="p">[</span><span class="s1">&#39;async&#39;</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;true&#39;</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;timeout&#39;</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;100s&#39;</span><span class="p">]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Using named parameters
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">ML_PREDICT</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">INPUT</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">input_table</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="n">MODEL</span><span class="w"> </span><span class="n">my_model</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ARGS</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="k">DESCRIPTOR</span><span class="p">(</span><span class="nb">text</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">CONFIG</span><span class="w"> </span><span class="o">=&gt;</span><span class="w"> </span><span class="k">MAP</span><span class="p">[</span><span class="s1">&#39;async&#39;</span><span class="p">,</span><span class="w"> </span><span class="s1">&#39;true&#39;</span><span class="p">]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-34992">FLINK-34992</a></li>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37777">FLINK-37777</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-437%3A&#43;Support&#43;ML&#43;Models&#43;in&#43;Flink&#43;SQL">FLIP-437</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-525%3A&#43;Model&#43;ML_PREDICT%2C&#43;ML_EVALUATE&#43;Implementation&#43;Design">FLIP-525</a></li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/sql/queries/model-inference/">Model Inference</a></li>
</ul>
<h2 id="process-table-functions-ptfs">
Process Table Functions (PTFs)
<a class="anchor" href="#process-table-functions-ptfs">#</a>
</h2>
<p>Apache Flink now includes support for Process Table Functions (PTFs), the most powerful function kind for Flink SQL and Table API.</p>
<p>Conceptually, a PTF is a superset of all other user-defined functions, mapping zero, one, or multiple tables to zero, one, or multiple rows.
They enable implementing user-defined operators that can be as feature-rich as built-in operations. PTFs have access to Flink&rsquo;s managed state,
event-time, timer services, and table changelogs.</p>
<p>PTFs enable the following tasks:</p>
<ul>
<li>Apply transformations on each row of a table.</li>
<li>Logically partition the table into distinct sets and apply transformations per set.</li>
<li>Store seen events for repeated access.</li>
<li>Continue the processing at a later point in time enabling waiting, synchronization, or timeouts.</li>
<li>Buffer and aggregate events using complex state machines or rule-based conditional logic.</li>
</ul>
<p>This moves Flink SQL significantly closer to the DataStream API, leveraging the robustness and familiarity of the existing SQL ecosystem.
Detailed information on PTF syntax and semantics can be found here: <a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/functions/ptfs/">Process Table Functions</a>.</p>
<p>Take the following code as an example:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// Declare a ProcessTableFunction for memorizing your customers</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">GreetingWithMemory</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">ProcessTableFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">class</span> <span class="nc">CountState</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">counter</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0L</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">eval</span><span class="p">(</span><span class="nd">@StateHint</span><span class="w"> </span><span class="n">CountState</span><span class="w"> </span><span class="n">state</span><span class="p">,</span><span class="w"> </span><span class="nd">@ArgumentHint</span><span class="p">(</span><span class="n">SET_SEMANTIC_TABLE</span><span class="p">)</span><span class="w"> </span><span class="n">Row</span><span class="w"> </span><span class="n">input</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">state</span><span class="p">.</span><span class="na">counter</span><span class="o">++</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">collect</span><span class="p">(</span><span class="s">&#34;Hello &#34;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">input</span><span class="p">.</span><span class="na">getFieldAs</span><span class="p">(</span><span class="s">&#34;name&#34;</span><span class="p">)</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="s">&#34;, your &#34;</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">state</span><span class="p">.</span><span class="na">counter</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="s">&#34; time?&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">TableEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">TableEnvironment</span><span class="p">.</span><span class="na">create</span><span class="p">(...);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Call the PTF in Table API</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">fromValues</span><span class="p">(</span><span class="s">&#34;Bob&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;Alice&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;Bob&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">as</span><span class="p">(</span><span class="s">&#34;name&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">partitionBy</span><span class="p">(</span><span class="n">$</span><span class="p">(</span><span class="s">&#34;name&#34;</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">process</span><span class="p">(</span><span class="n">GreetingWithMemory</span><span class="p">.</span><span class="na">class</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">execute</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">print</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Call the PTF in SQL</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">executeSql</span><span class="p">(</span><span class="s">&#34;SELECT * FROM GreetingWithMemory(TABLE Names PARTITION BY name)&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">print</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-36703">FLINK-36703</a></li>
<li><a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=298781093">FLIP-440</a></li>
</ul>
<h2 id="variant-type">
Variant Type
<a class="anchor" href="#variant-type">#</a>
</h2>
<p>Variant is a new data type for semi-structured data(e.g. JSON), it supports storing any
semi-structured data, including ARRAY, MAP(with STRING keys), and scalar types—while preserving
field type information in a JSON-like structure. Unlike ROW and STRUCTURED types, VARIANT provides
superior flexibility for handling deeply nested and evolving schemas.</p>
<p>Users can use <code>PARSE_JSON</code> or<code>TRY_PARSE_JSON</code> to convert JSON-formatted VARCHAR data to VARIANT. In
addition, table formats like Apache Paimon now support the VARIANT type, this enable
users to efficiently process semi-structured data in lakehouse using Flink SQL.</p>
<p>Take the following SQL statements as an example:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">t1</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">v</span><span class="w"> </span><span class="n">STRING</span><span class="w"> </span><span class="c1">-- a json string
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;mysql-cdc&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">t2</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">id</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">v</span><span class="w"> </span><span class="n">VARIANT</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;paimon&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- write to t2 with VARIANT type
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">INSERT</span><span class="w"> </span><span class="k">INTO</span><span class="w"> </span><span class="n">t2</span><span class="w"> </span><span class="k">SELECT</span><span class="w"> </span><span class="n">id</span><span class="p">,</span><span class="w"> </span><span class="n">PARSE_JSON</span><span class="p">(</span><span class="n">v</span><span class="p">)</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">t1</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37922">FLINK-37922</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-521%3A&#43;Integrating&#43;Variant&#43;Type&#43;into&#43;Flink%3A&#43;Enabling&#43;Efficient&#43;Semi-Structured&#43;Data&#43;Processing">FLIP-521</a></li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/types/#other-data-types">Variant</a></li>
</ul>
<h2 id="structured-type-enhancements">
Structured Type Enhancements
<a class="anchor" href="#structured-type-enhancements">#</a>
</h2>
<p>In Flink 2.1, we enabled declare user-defined objects via STRUCTURED TYPE directly in <code>CREATE TABLE</code> DDL
statements, resolving critical type equivalence issues and significantly improving API usability.</p>
<p>Take the following SQL statements as an example:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">MyTable</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">uid</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">user</span><span class="w"> </span><span class="n">STRUCTURED</span><span class="o">&lt;</span><span class="s1">&#39;com.example.User&#39;</span><span class="p">,</span><span class="w"> </span><span class="n">name</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">age</span><span class="w"> </span><span class="nb">INT</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="k">NULL</span><span class="o">&gt;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Casts a row type into a structured type
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">INSERT</span><span class="w"> </span><span class="k">INTO</span><span class="w"> </span><span class="n">MyTable</span><span class="w"> </span><span class="k">SELECT</span><span class="w"> </span><span class="mi">1</span><span class="p">,</span><span class="w"> </span><span class="k">CAST</span><span class="p">((</span><span class="s1">&#39;Bob&#39;</span><span class="p">,</span><span class="w"> </span><span class="mi">42</span><span class="p">)</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">STRUCTURED</span><span class="o">&lt;</span><span class="s1">&#39;com.example.User&#39;</span><span class="p">,</span><span class="w"> </span><span class="n">name</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">age</span><span class="w"> </span><span class="nb">INT</span><span class="o">&gt;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37861">FLINK-37861</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-520%3A&#43;Simplify&#43;StructuredType&#43;handling">FLIP-520</a></li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/types/#user-defined-data-types">STRUCTURED</a></li>
</ul>
<h2 id="delta-join">
Delta Join
<a class="anchor" href="#delta-join">#</a>
</h2>
<p>Introduced a new DeltaJoin operator in stream processing jobs, along with optimizations for simple
streaming join pipeline. Compared to traditional streaming join, delta join requires significantly
less state, effectively mitigating issues related to large state, including resource bottlenecks,
slow checkpointing, and lengthy job recovery times. This feature is enabled by default.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37836">FLINK-37836</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-486%3A&#43;Introduce&#43;A&#43;New&#43;DeltaJoin">Delta Join</a></li>
</ul>
<h2 id="multiple-regular-joins">
Multiple Regular Joins
<a class="anchor" href="#multiple-regular-joins">#</a>
</h2>
<p>Streaming Flink jobs with multiple cascaded streaming joins often experience operational
instability and performance degradation due to large state sizes. This release introduces a
multi-join operator (<code>StreamingMultiJoinOperator</code>) that drastically reduces state size
by eliminating intermediate results. The operator achieves this by processing joins across all input
streams simultaneously within a single operator instance, storing only raw input records instead of
propagated join output.</p>
<p>This &ldquo;zero intermediate state&rdquo; approach primarily targets state reduction, offering substantial
benefits in resource consumption and operational stability. This feature is now available for
pipelines with multiple INNER/LEFT joins that share at least one common join key, enable with
<code>SET 'table.optimizer.multi-join.enabled' = 'true'</code>.</p>
<p><strong>Benchmark</strong>: we conducted a benchmark comparing the benefits of the multi-join operator with default binary joins, more detail can see
<a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/docs/dev/table/tuning/#multiple-regular-joins">MultiJoin Benchmark</a>.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37859">FLINK-37859</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-516%3A&#43;Multi-Way&#43;Join&#43;Operator">MultiJoin</a></li>
</ul>
<h2 id="async-lookup-join-enhancements">
Async Lookup Join Enhancements
<a class="anchor" href="#async-lookup-join-enhancements">#</a>
</h2>
<p>In previous versions of async lookup join, even if users set <code>table.exec.async-lookup.output-mode</code> to <code>ALLOW_UNORDERED</code>,
the engine would still forcibly fallback to ordered mode when processing update streams to ensure correctness.
Starting from Flink 2.1, the engine allows parallel processing of unrelated update records while still ensuring correctness,
thereby achieving higher throughput when handling changelog streams.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37874">FLINK-37874</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-519%3A&#43;&#43;Introduce&#43;async&#43;lookup&#43;key&#43;ordered&#43;mode">Async Lookup Join</a></li>
</ul>
<h2 id="sink-reuse">
Sink Reuse
<a class="anchor" href="#sink-reuse">#</a>
</h2>
<p>Within a single Flink job, when writing multiple <code>INSERT INTO</code> statements updating identical columns (
different columns will be supported in next release) of a target table, the planner will optimize
the execution plan and merge the sink nodes to achieve reuse. This will be a great usability improvement
for users using partial-update features with data lake storages like Apache Paimon.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37227">FLINK-37227</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-506%3A&#43;Support&#43;Reuse&#43;Multiple&#43;Table&#43;Sinks&#43;in&#43;Planner">Sink Reuse</a></li>
</ul>
<h2 id="support-smile-format-for-compiled-plan-serialization">
Support Smile Format for Compiled Plan Serialization
<a class="anchor" href="#support-smile-format-for-compiled-plan-serialization">#</a>
</h2>
<p>In Flink 2.1, we added smile binary format support for compiled plans, providing a memory-efficient
alternative to JSON for serialization/deserialization. By default JSON is used, in order to use
smile format need to call <code>CompiledPlan#asSmileBytes</code> and <code>PlanReference#fromSmileBytes</code> method.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37341">FLINK-37341</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-508%3A&#43;Add&#43;support&#43;for&#43;Smile&#43;format&#43;for&#43;Compiled&#43;plans">Smile Format</a></li>
<li><a href="https://github.com/FasterXML/smile-format-specification/blob/master/smile-specification.md">Smile Format Specification</a></li>
</ul>
<h1 id="runtime">
Runtime
<a class="anchor" href="#runtime">#</a>
</h1>
<h2 id="add-pluggable-batching-for-async-sink">
Add Pluggable Batching for Async Sink
<a class="anchor" href="#add-pluggable-batching-for-async-sink">#</a>
</h2>
<p>In Flink 2.1, we introduced a pluggable batching mechanism for async sink that allows users to define custom
batching write strategies tailored to specific requirements.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37298">FLINK-37298</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-509&#43;Add&#43;pluggable&#43;Batching&#43;for&#43;Async&#43;Sink">Pluggable Batching for Async Sink</a></li>
</ul>
<h2 id="split-level-watermark-metrics">
Split-level Watermark Metrics
<a class="anchor" href="#split-level-watermark-metrics">#</a>
</h2>
<p>In Flink 2.1, we added some split level watermark metrics, covering watermark progress and per-split state gauges
to enhance the watermark observability:</p>
<ul>
<li><code>currentWatermark</code>: the last watermark this split has received.</li>
<li><code>activeTimeMsPerSecond</code>: the time this split is active per second.</li>
<li><code>pausedTimeMsPerSecond</code>: the time this split is paused due to watermark alignment per second.</li>
<li><code>idleTimeMsPerSecond</code>: the time this split is marked idle by idleness detection per second.</li>
<li><code>accumulatedActiveTimeMs</code>: accumulated time this split was active since registered.</li>
<li><code>accumulatedPausedTimeMs</code>: accumulated time this split was paused since registered.</li>
<li><code>accumulatedIdleTimeMs</code>: accumulated time this split was idle since registered.</li>
</ul>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37410">FLINK-37410</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-513%3A&#43;Split-level&#43;Watermark&#43;Metrics">Watermark Metrics</a></li>
</ul>
<h1 id="connectors">
Connectors
<a class="anchor" href="#connectors">#</a>
</h1>
<h2 id="introduce-sql-connector-for-keyed-state">
Introduce SQL Connector for Keyed State
<a class="anchor" href="#introduce-sql-connector-for-keyed-state">#</a>
</h2>
<p>In Flink 2.1, we introduced a new connector for keyed state. This connector allows
users to query keyed state directly from checkpoint or savepoint using Flink SQL, making it easier
to inspect, debug, and validate the state of Flink jobs without custom tooling. This feature is
especially useful for analyzing long-running jobs and validating state migrations.</p>
<p>With a simple DDL, you can expose ValueState as table and run Flink SQL query the snapshot:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">keyed_state</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">k</span><span class="w"> </span><span class="nb">INTEGER</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">user_id</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">balance</span><span class="w"> </span><span class="n">DOUBLE</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;savepoint&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;path&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;file:///savepoint/path&amp;&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;uid&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;my-operator-id&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Query the keyed state
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">keyed_state</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-36929">FLINK-36929</a></li>
<li><a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A&#43;SQL&#43;connector&#43;for&#43;keyed&#43;savepoint&#43;data">Savepoint Connector</a></li>
</ul>
<h1 id="others-improvements">
Others Improvements
<a class="anchor" href="#others-improvements">#</a>
</h1>
<h2 id="pyflink">
PyFlink
<a class="anchor" href="#pyflink">#</a>
</h2>
<p>In PyFlink 2.1, we added support for Python 3.12 and removed Python 3.8.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37823">FLINK-37823</a></li>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37776">FLINK-37776</a></li>
</ul>
<h2 id="upgrade-flink-shaded-version-to-200">
Upgrade flink-shaded version to 20.0
<a class="anchor" href="#upgrade-flink-shaded-version-to-200">#</a>
</h2>
<p>Bump flink-shaded version to 20.0 to support Smile format.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37376">FLINK-37376</a></li>
</ul>
<h2 id="upgrade-parquet-version-to-1153">
Upgrade Parquet version to 1.15.3
<a class="anchor" href="#upgrade-parquet-version-to-1153">#</a>
</h2>
<p>Bump parquet version to 1.15.3 to resolve parquet-avro module
vulnerability found in <a href="https://nvd.nist.gov/vuln/detail/CVE-2025-30065">CVE-2025-30065</a>.</p>
<p><strong>More Information</strong></p>
<ul>
<li><a href="https://issues.apache.org/jira/browse/FLINK-37760">FLINK-37760</a></li>
</ul>
<h1 id="upgrade-notes">
Upgrade Notes
<a class="anchor" href="#upgrade-notes">#</a>
</h1>
<p>The Flink community tries to ensure that upgrades are as seamless as possible.
However, certain changes may require users to make adjustments to certain parts
of the program when upgrading to version 2.1. Please refer to the
<a href="https://nightlies.apache.org/flink/flink-docs-release-2.1/release-notes/flink-2.1/">release notes</a>
for a comprehensive list of adjustments to make and issues to check during the
upgrading process.</p>
<h1 id="list-of-contributors">
List of Contributors
<a class="anchor" href="#list-of-contributors">#</a>
</h1>
<p>The Apache Flink community would like to express gratitude to all the contributors who made this release possible:</p>
<p>Ahmed Hamdy, Alan Sheinberg, Aleksandr Iushmanov, Aleksandr Savonin, AlexYinHan, Ammu Parvathy, Anupam Aggarwal, Ao Li, Arvid Heise, Au-Miner, Benchao Li, Bonnie Varghese, Chris, David Moravek, David Radley, David Wang, Dawid Wysakowicz, Dian Fu, Efrat Levitan, Feng Jin, Ferenc Csaky, Francesco Di Chiara, Gabor Somogyi, Gunnar Morling, Gustavo de Morais, Hangxiang Yu, Hao Li, Hongjia Liang, HuangXingBo, Jiaan Geng, Jiabao Sun, Jiangjie (Becket) Qin, Joery, JunRuiLee, Junrui Lee, Juntao Zhang, Kunni, Kurt Ostfeld, Laffery, Lukas Schwerdtfeger, Luke Chen, Martijn Visser, Mate Czagany, Matthias Pohl, Mika Naylor, Mina Asham, Mingliang Liu, Muhammet Orazov, Márton Balassi, PB, Pan Yuepeng, Peter Huang, Piotr Nowojski, Roc Marshal, Rui Fan, Ryan van Huuksloot, Sasaki Toru, Sergey Nuyanzin, Shengkai, Shuyi Chen, Stepan Stepanishchev, Thomas Cooper, Tianzhu Wen, Timo Walther, Venkata krishnan Sowrirajan, Weijie Guo, Xiangyu Feng, Xu Huang, XuShuai, Xuannan, Xuyang, Yanfei Lei, Yi Zhang, Yuepeng Pan, Yun Tang, Zakelly, Zdenek Tison, Zhanghao Chen, atu-sharm, beliefer, big face cat, chenyuzhi459, fengli, fredia, gengbiao.gb, glorinli, hejufang, huangyanyanyan, jingge, lincoln lee, mayuehappy, moses, mzzx, nacisimsek, nilmadhab mondal, noorall, novakov-alexey, r-sidd, slankka, slfan1989, sunxia, sxnan, wangfeifan, wangqh, wangxinglong, xiangyu0xf, xiaoyu, xingbo, xuyang, yanand0909, yhx, yuhang2.zhang, yunfengzhou-hub, 余良, 皆非, 马越</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2025-07-31-release-2.1.0.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#flink-sql-improvements">Flink SQL Improvements</a>
<ul>
<li><a href="#model-ddls">Model DDLs</a></li>
<li><a href="#realtime-ai-function">Realtime AI Function</a></li>
<li><a href="#process-table-functions-ptfs">Process Table Functions (PTFs)</a></li>
<li><a href="#variant-type">Variant Type</a></li>
<li><a href="#structured-type-enhancements">Structured Type Enhancements</a></li>
<li><a href="#delta-join">Delta Join</a></li>
<li><a href="#multiple-regular-joins">Multiple Regular Joins</a></li>
<li><a href="#async-lookup-join-enhancements">Async Lookup Join Enhancements</a></li>
<li><a href="#sink-reuse">Sink Reuse</a></li>
<li><a href="#support-smile-format-for-compiled-plan-serialization">Support Smile Format for Compiled Plan Serialization</a></li>
</ul>
</li>
<li><a href="#runtime">Runtime</a>
<ul>
<li><a href="#add-pluggable-batching-for-async-sink">Add Pluggable Batching for Async Sink</a></li>
<li><a href="#split-level-watermark-metrics">Split-level Watermark Metrics</a></li>
</ul>
</li>
<li><a href="#connectors">Connectors</a>
<ul>
<li><a href="#introduce-sql-connector-for-keyed-state">Introduce SQL Connector for Keyed State</a></li>
</ul>
</li>
<li><a href="#others-improvements">Others Improvements</a>
<ul>
<li><a href="#pyflink">PyFlink</a></li>
<li><a href="#upgrade-flink-shaded-version-to-200">Upgrade flink-shaded version to 20.0</a></li>
<li><a href="#upgrade-parquet-version-to-1153">Upgrade Parquet version to 1.15.3</a></li>
</ul>
</li>
<li><a href="#upgrade-notes">Upgrade Notes</a></li>
<li><a href="#list-of-contributors">List of Contributors</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>