
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>

<head>
  


<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/12/10/apache-flink-1.12.0-release-announcement/">

  <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="The Apache Flink community is excited to announce the release of Flink 1.12.0! Close to 300 contributors worked on over 1k threads to bring significant improvements to usability as well as new features that simplify (and unify) Flink handling across the API stack.
Release Highlights
The community has added support for efficient batch execution in the DataStream API. This is the next major milestone towards achieving a truly unified runtime for both batch and stream processing.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Apache Flink 1.12.0 Release Announcement" />
<meta property="og:description" content="The Apache Flink community is excited to announce the release of Flink 1.12.0! Close to 300 contributors worked on over 1k threads to bring significant improvements to usability as well as new features that simplify (and unify) Flink handling across the API stack.
Release Highlights
The community has added support for efficient batch execution in the DataStream API. This is the next major milestone towards achieving a truly unified runtime for both batch and stream processing." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/12/10/apache-flink-1.12.0-release-announcement/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-12-10T08:00:00+00:00" />
<meta property="article:modified_time" content="2020-12-10T08:00:00+00:00" />
<title>Apache Flink 1.12.0 Release Announcement | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->

  <meta name="generator" content="Hugo 0.124.1">

    
    <script>
      var _paq = window._paq = window._paq || [];
       
       
      _paq.push(['disableCookies']);
       
      _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
      _paq.push(['trackPageView']);
      _paq.push(['enableLinkTracking']);
      (function() {
        var u="//analytics.apache.org/";
        _paq.push(['setTrackerUrl', u+'matomo.php']);
        _paq.push(['setSiteId', '1']);
        var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
        g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
      })();
    </script>
    
</head>

<body dir=ZgotmplZ>
  


<header>
  <nav class="navbar navbar-expand-xl">
    <div class="container-fluid">
      <a class="navbar-brand" href="/">
        <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
        <span>Apache Flink</span>
      </a>
      <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
          <i class="fa fa-bars navbar-toggler-icon"></i>
      </button>
      <div class="collapse navbar-collapse" id="navbarSupportedContent">
        <ul class="navbar-nav">
          





    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/security/">Security</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
  

          </li>
        
      </ul>
    </li>
  

    


    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/posts/">Flink Blog</a>
  

    </li>
  

    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/downloads/">Downloads</a>
  

    </li>
  

    


    









        </ul>
        <div class="book-search">
          <div class="book-search-spinner hidden">
            <i class="fa fa-refresh fa-spin"></i>
          </div>
          <form class="search-bar d-flex" onsubmit="return false;"su>
            <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
            <i class="fa fa-search search"></i>
            <i class="fa fa-circle-o-notch fa-spin spinner"></i>
          </form>
          <div class="book-search-spinner hidden"></div>
          <ul id="book-search-results"></ul>
        </div>
      </div>
    </div>
  </nav>
  <div class="navbar-clearfix"></div>
</header>
 
  
      <main class="flex">
        <section class="container book-page">
          
<article class="markdown">
    <h1>
        <a href="/2020/12/10/apache-flink-1.12.0-release-announcement/">Apache Flink 1.12.0 Release Announcement</a>
    </h1>
    


  December 10, 2020 -



  Marta Paes

  <a href="https://twitter.com/morsapaes">(@morsapaes)</a>
  

  Aljoscha Krettek

  <a href="https://twitter.com/aljoscha">(@aljoscha)</a>
  



    <p><p>The Apache Flink community is excited to announce the release of Flink 1.12.0! Close to 300 contributors worked on over 1k threads to bring significant improvements to usability as well as new features that simplify (and unify) Flink handling across the API stack.</p>
<p><strong>Release Highlights</strong></p>
<ul>
<li>
<p>The community has added support for <strong>efficient batch execution</strong> in the DataStream API. This is the next major milestone towards achieving a truly unified runtime for both batch and stream processing.</p>
</li>
<li>
<p><strong>Kubernetes-based High Availability (HA)</strong> was implemented as an alternative to ZooKeeper for highly available production setups.</p>
</li>
<li>
<p>The Kafka SQL connector has been extended to work in <strong>upsert mode</strong>, supported by the ability to handle <strong>connector metadata</strong> in SQL DDL. <strong>Temporal table joins</strong> can now also be fully expressed in SQL, no longer depending on the Table API.</p>
</li>
<li>
<p>Support for the <strong>DataStream API in PyFlink</strong> expands its usage to more complex scenarios that require fine-grained control over state and time, and it’s now possible to deploy PyFlink jobs natively on <strong>Kubernetes</strong>.</p>
</li>
</ul>
<p>This blog post describes all major new features and improvements, important changes to be aware of and what to expect moving forward.</p>
<p>The binary distribution and source artifacts are now available on the updated <a href="/downloads.html">Downloads page</a> of the Flink website, and the most recent distribution of PyFlink is available on <a href="https://pypi.org/project/apache-flink/">PyPI</a>. Please review the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/release-notes/flink-1.12.html">release notes</a> carefully, and check the complete <a href="https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12348263&amp;styleName=Html&amp;projectId=12315522">release changelog</a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/">updated documentation</a> for more details.</p>
<p>We encourage you to download the release and share your feedback with the community through the <a href="https://flink.apache.org/community.html#mailing-lists">Flink mailing lists</a> or <a href="https://issues.apache.org/jira/projects/FLINK/summary">JIRA</a>.</p>
<h2 id="new-features-and-improvements">
  New Features and Improvements
  <a class="anchor" href="#new-features-and-improvements">#</a>
</h2>
<h3 id="batch-execution-mode-in-the-datastream-api">
  Batch Execution Mode in the DataStream API
  <a class="anchor" href="#batch-execution-mode-in-the-datastream-api">#</a>
</h3>
<p>Flink’s core APIs have developed organically over the lifetime of the project, and were initially designed with specific use cases in mind. And while the Table API/SQL already has unified operators, using lower-level abstractions still requires you to choose between two semantically different APIs for batch (DataSet API) and streaming (DataStream API). Since <em>a batch is a subset of an unbounded stream</em>, there are some clear advantages to consolidating them under a single API:</p>
<ul>
<li>
<p><strong>Reusability:</strong> efficient batch and stream processing under the same API would allow you to easily switch between both execution modes without rewriting any code. So, a job could be easily reused to process real-time and historical data.</p>
</li>
<li>
<p><strong>Operational simplicity:</strong> providing a unified API would mean using a single set of connectors, maintaining a single codebase and being able to easily implement mixed execution pipelines <em>e.g.</em> for use cases like backfilling.</p>
</li>
</ul>
<p>With these advantages in mind, the community has taken the first step towards the unification of the DataStream API: supporting efficient batch execution (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A&#43;Batch&#43;execution&#43;for&#43;the&#43;DataStream&#43;API">FLIP-134</a>). This means that, in the long run, the DataSet API will be deprecated and subsumed by the DataStream API and the Table API/SQL (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741">FLIP-131</a>). For an overview of the unification effort, refer to <a href="https://youtu.be/z9ye4jzp4DQ">this</a> recent Flink Forward talk.</p>
<p><strong>Batch for Bounded Streams</strong></p>
<p>You could already use the DataStream API to process bounded streams (<em>e.g.</em> files), with the limitation that the runtime is not “aware” that the job is bounded. To optimize the runtime for bounded input, the new <code>BATCH</code> mode execution uses sort-based shuffles with aggregations purely in-memory and an improved scheduling strategy (<em>see <a href="#pipelined-region-scheduling-flip-119">Pipelined Region Scheduling</a></em>). As a result, <code>BATCH</code> mode execution in the DataStream API already comes very close to the performance of the DataSet API in Flink 1.12. For more details on the performance benchmark, check the original proposal (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-140%3A&#43;Introduce&#43;batch-style&#43;execution&#43;for&#43;bounded&#43;keyed&#43;streams">FLIP-140</a>).</p>
<center>
  <figure>
  <img src="/img/blog/2020-12-08-release-1.12.0/1.png" width="600px"/>
  </figure>
</center>
<div style="line-height:60%;">
    <br>
</div>
<p>In Flink 1.12, the default execution mode is <code>STREAMING</code>. To configure a job to run in <code>BATCH</code> mode, you can set the configuration when submitting a job:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-bash" data-lang="bash"><span class="line"><span class="cl">bin/flink run -Dexecution.runtime-mode<span class="o">=</span>BATCH examples/streaming/WordCount.jar
</span></span></code></pre></div><p>, or do it programmatically:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">setRuntimeMode</span><span class="p">(</span><span class="n">RuntimeMode</span><span class="p">.</span><span class="na">BATCH</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><div style="line-height:150%;">
    <br>
</div>
<div class="alert alert-info small" markdown="1">
<b>Note:</b> Although the DataSet API has not been deprecated yet, we recommend that users give preference to the DataStream API with <code>BATCH</code> execution mode for new batch jobs, and consider migrating existing DataSet jobs.
</div>
<h3 id="new-data-sink-api-beta">
  New Data Sink API (Beta)
  <a class="anchor" href="#new-data-sink-api-beta">#</a>
</h3>
<p>Ensuring that connectors can work for both execution modes has already been covered for data sources in the <a href="https://flink.apache.org/news/2020/07/06/release-1.11.0.html#new-data-source-api-beta">previous release</a>, so in Flink 1.12 the community focused on implementing a unified Data Sink API (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A&#43;Unified&#43;Sink&#43;API">FLIP-143</a>). The new abstraction introduces a write/commit protocol and a more modular interface where the individual components are transparently exposed to the framework.</p>
<p>A <em>Sink</em> implementor will have to provide the <strong>what</strong> and <strong>how</strong>: a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/SinkWriter.html"><em>SinkWriter</em></a> that writes data and outputs what needs to be committed (i.e. committables); and a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/Committer.html"><em>Committer</em></a> and <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/connector/sink/GlobalCommitter.html"><em>GlobalCommitter</em></a> that encapsulate how to handle the committables. The framework is responsible for the <strong>when</strong> and <strong>where</strong>: at what time and on which machine or process to commit.</p>
<center>
  <figure>
  <img src="/img/blog/2020-12-08-release-1.12.0/2.png" width="700px"/>
  </figure>
</center>
<div style="line-height:150%;">
    <br>
</div>
<p>This more modular abstraction allowed to support different runtime implementations for the <code>BATCH</code> and <code>STREAMING</code> execution modes that are efficient for their intended purpose, but use just one, unified sink implementation. In Flink 1.12, the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/file_sink.html">FileSink connector</a> is the unified drop-in replacement for StreamingFileSink (<a href="https://issues.apache.org/jira/browse/FLINK-19758">FLINK-19758</a>). The remaining connectors will be ported to the new interfaces in future releases.</p>
<h3 id="kubernetes-high-availability-ha-service">
  Kubernetes High Availability (HA) Service
  <a class="anchor" href="#kubernetes-high-availability-ha-service">#</a>
</h3>
<p>Kubernetes provides built-in functionalities that Flink can leverage for JobManager failover, instead of relying on <a href="https://zookeeper.apache.org/">ZooKeeper</a>. To enable a “ZooKeeperless” HA setup, the community implemented a Kubernetes HA service in Flink 1.12 (<a href="https://cwiki.apache.org/confluence/x/H0V4CQ">FLIP-144</a>). The service is built on the same <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.html">base interface</a> as the ZooKeeper implementation and uses Kubernetes’ <a href="https://kubernetes.io/docs/concepts/configuration/configmap/">ConfigMap</a> objects to handle all the metadata needed to recover from a JobManager failure. For more details and examples on how to configure a highly available Kubernetes cluster, check out the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/deployment/ha/kubernetes_ha.html">documentation</a>.</p>
<div class="alert alert-info small" markdown="1">
<b>Note:</b> This does not mean that the ZooKeeper dependency will be dropped, just that there will be an alternative for users of Flink on Kubernetes.
</div>
<hr>
<h3 id="other-improvements">
  Other Improvements
  <a class="anchor" href="#other-improvements">#</a>
</h3>
<p><strong>Migration of existing connectors to the new Data Source API</strong></p>
<p>The previous release introduced a new Data Source API (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A&#43;Refactor&#43;Source&#43;Interface">FLIP-27</a>), allowing to implement connectors that work both as bounded (batch) and unbounded (streaming) sources. In Flink 1.12, the community started porting existing source connectors to the new interfaces, starting with the FileSystem connector (<a href="https://issues.apache.org/jira/browse/FLINK-19161">FLINK-19161</a>).</p>
<div class="alert alert-danger small" markdown="1">
<b>Attention:</b> The unified source implementations will be completely separate connectors that are not snapshot-compatible with their legacy counterparts.
</div>
<p><strong>Pipelined Region Scheduling (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-119&#43;Pipelined&#43;Region&#43;Scheduling#FLIP119PipelinedRegionScheduling-BulkSlotAllocation">FLIP-119</a>)</strong></p>
<p>Flink’s scheduler has been largely designed to address batch and streaming workloads separately. This release introduces a <strong>unified</strong> scheduling strategy that identifies blocking data exchanges to break down the execution graph into <em>pipelined regions</em>. This allows to schedule each region only when there’s data to perform work and only deploy it once all the required resources are available; as well as to restart failed regions independently. In particular for batch jobs, the new strategy leads to more efficient resource utilization and eliminates deadlocks.</p>
<p><strong>Support for Sort-Merge Shuffles (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-148%3A&#43;Introduce&#43;Sort-Merge&#43;Based&#43;Blocking&#43;Shuffle&#43;to&#43;Flink">FLIP-148</a>)</strong></p>
<p>To improve the stability, performance and resource utilization of large-scale batch jobs, the community introduced sort-merge shuffle as an alternative to the original shuffle implementation that Flink already used. This approach can reduce shuffle time <a href="https://www.mail-archive.com/dev@flink.apache.org/msg42472.html">significantly</a>, and uses fewer file handles and file write buffers (which is problematic for large-scale jobs). Further optimizations will be implemented in upcoming releases (<a href="https://issues.apache.org/jira/browse/FLINK-19614">FLINK-19614</a>).</p>
<div class="alert alert-danger small" markdown="1">
<b>Attention:</b> This feature is experimental and not enabled by default. To enable sort-merge shuffles, you can configure a reasonable minimum parallelism threshold in the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/deployment/config.html#taskmanager-network-sort-shuffle-min-parallelism">TaskManager network configuration options</a>.
</div>
<p><strong>Improvements to the Flink WebUI (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-75%3A&#43;Flink&#43;Web&#43;UI&#43;Improvement&#43;Proposal">FLIP-75</a>)</strong></p>
<p>As a continuation of the series of improvements to the Flink WebUI kicked off in the last release, the community worked on exposing JobManager&rsquo;s memory-related metrics and configuration parameters on the WebUI (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-104%3A&#43;Add&#43;More&#43;Metrics&#43;to&#43;Jobmanager">FLIP-104</a>). The TaskManager&rsquo;s metrics page has also been updated to reflect the <a href="https://flink.apache.org/news/2020/04/21/memory-management-improvements-flink-1.10.html">changes to the TaskManager memory model</a> introduced in Flink 1.10 (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-102%3A&#43;Add&#43;More&#43;Metrics&#43;to&#43;TaskManager">FLIP-102</a>), adding new metrics for Managed Memory, Network Memory and Metaspace.</p>
<hr>
<h3 id="table-apisql-metadata-handling-in-sql-connectors">
  Table API/SQL: Metadata Handling in SQL Connectors
  <a class="anchor" href="#table-apisql-metadata-handling-in-sql-connectors">#</a>
</h3>
<p>Some sources (and formats) expose additional fields as metadata that can be valuable for users to process along with record data. A common example is Kafka, where you might want to <em>e.g.</em> access offset, partition or topic information, read/write the record key or use embedded metadata timestamps for time-based operations.
With the new release, Flink SQL supports <strong>metadata columns</strong> to read and write connector- and format-specific fields for every row of a table (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-107%3A&#43;Handling&#43;of&#43;metadata&#43;in&#43;SQL&#43;connectors">FLIP-107</a>). These columns are declared in the <code>CREATE TABLE</code> statement using the <code>METADATA</code> (reserved) keyword.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">kafka_table</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="n">id</span><span class="w"> </span><span class="nb">BIGINT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="n">name</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="n">event_time</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span><span class="w"> </span><span class="n">METADATA</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="s1">&#39;timestamp&#39;</span><span class="p">,</span><span class="w"> </span><span class="c1">-- access Kafka &#39;timestamp&#39; metadata
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="w">  </span><span class="n">headers</span><span class="w"> </span><span class="k">MAP</span><span class="o">&lt;</span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">BYTES</span><span class="o">&gt;</span><span class="w"> </span><span class="n">METADATA</span><span class="w">  </span><span class="c1">-- access Kafka &#39;headers&#39; metadata
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;kafka&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="s1">&#39;topic&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;test-topic&#39;</span><span class="p">,</span><span class="w"> 
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="s1">&#39;format&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;avro&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>In Flink 1.12, metadata is exposed for the <strong>Kafka</strong> and <strong>Kinesis</strong> connectors, with work on the FileSystem connector already planned (<a href="https://issues.apache.org/jira/browse/FLINK-19903">FLINK-19903</a>). Due to the more complex structure of Kafka records, <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#key-format">new properties</a> were also specifically implemented for the Kafka connector to control how to handle the key/value pairs. For a complete overview of metadata support in Flink SQL, check the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/">documentation</a> for each connector, as well as the motivating use cases in the original proposal.</p>
<h3 id="table-apisql-upsert-kafka-connector">
  Table API/SQL: Upsert Kafka Connector
  <a class="anchor" href="#table-apisql-upsert-kafka-connector">#</a>
</h3>
<p>For some use cases, like interpreting compacted topics or writing out (updating) aggregated results, it’s necessary to handle Kafka record keys as <em>true</em> primary keys that can determine what should be inserted, deleted or updated. To enable this, the community created a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/upsert-kafka.html">dedicated upsert connector</a> (<code>upsert-kafka</code>) that extends the base implementation to work in <em>upsert</em> mode (<a href="https://issues.apache.org/jira/browse/FLINK-19857">FLIP-149</a>).</p>
<p>The new <code>upsert-kafka</code> connector can be used for sources and sinks, and provides the <strong>same base functionality</strong> and <strong>persistence guarantees</strong> as the existing Kafka connector, as it reuses most of its code under the hood. To use the <code>upsert-kafka connector</code>, you must define a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/sql/create.html#primary-key">primary key constraint</a> on table creation, as well as specify the (de)serialization format for the key (<code>key.format</code>) and value (<code>value.format</code>).</p>
<h3 id="table-apisql-support-for-temporal-table-joins-in-sql">
  Table API/SQL: Support for Temporal Table Joins in SQL
  <a class="anchor" href="#table-apisql-support-for-temporal-table-joins-in-sql">#</a>
</h3>
<p>Instead of creating a temporal table function to look up against a table at a certain point in time, you can now simply use the standard SQL clause <code>FOR SYSTEM_TIME AS OF</code> (SQL:2011) to express a <strong>temporal table join</strong>. In addition, temporal joins are now supported against <em>any</em> kind of table that has a time attribute and a primary key, and not just <em>append-only</em> tables. This unlocks a new set of use cases, like performing temporal joins directly against Kafka compacted topics or database changelogs (e.g. from Debezium).</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Table backed by a Kafka topic
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">orders</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">order_id</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">currency</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">amount</span><span class="w"> </span><span class="nb">INT</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">order_time</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">WATERMARK</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">order_time</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">order_time</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;30&#39;</span><span class="w"> </span><span class="k">SECOND</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;kafka&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Table backed by a Kafka compacted topic
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">latest_rates</span><span class="w"> </span><span class="p">(</span><span class="w"> 
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">currency</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">currency_rate</span><span class="w"> </span><span class="nb">DECIMAL</span><span class="p">(</span><span class="mi">38</span><span class="p">,</span><span class="w"> </span><span class="mi">10</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">currency_time</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">WATERMARK</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">currency_time</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">currency_time</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="nb">INTERVAL</span><span class="w"> </span><span class="s1">&#39;5&#39;</span><span class="w"> </span><span class="k">SECOND</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="k">PRIMARY</span><span class="w"> </span><span class="k">KEY</span><span class="w"> </span><span class="p">(</span><span class="n">currency</span><span class="p">)</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="n">ENFORCED</span><span class="w">      
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;upsert-kafka&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">  </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">-- Event-time temporal table join
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="k">SELECT</span><span class="w"> 
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">o</span><span class="p">.</span><span class="n">order_id</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">o</span><span class="p">.</span><span class="n">order_time</span><span class="p">,</span><span class="w"> 
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">o</span><span class="p">.</span><span class="n">amount</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="n">r</span><span class="p">.</span><span class="n">currency_rate</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">amount</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">r</span><span class="p">.</span><span class="n">currency</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">FROM</span><span class="w"> </span><span class="n">orders</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">o</span><span class="w"> </span><span class="k">JOIN</span><span class="w"> </span><span class="n">latest_rates</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">SYSTEM_TIME</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="k">OF</span><span class="w"> </span><span class="n">o</span><span class="p">.</span><span class="n">order_time</span><span class="w"> </span><span class="n">r</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">ON</span><span class="w"> </span><span class="n">o</span><span class="p">.</span><span class="n">currency</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">r</span><span class="p">.</span><span class="n">currency</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>The previous example also shows how you can take advantage of the new <code>upsert-kafka</code> connector in the context of temporal table joins.</p>
<p><strong>Hive Tables in Temporal Table Joins</strong></p>
<p>You can also perform temporal table joins against Hive tables by either automatically reading the latest table partition as a temporal table (<a href="https://issues.apache.org/jira/browse/FLINK-19644">FLINK-19644</a>) or the whole table as a bounded stream tracking the latest version at execution time. Refer to the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/hive/hive_read_write.html#temporal-table-join">documentation</a> for examples of using Hive tables in temporal table joins.</p>
<hr>
<h3 id="other-improvements-to-the-table-apisql">
  Other Improvements to the Table API/SQL
  <a class="anchor" href="#other-improvements-to-the-table-apisql">#</a>
</h3>
<p><strong>Kinesis Flink SQL Connector (<a href="https://issues.apache.org/jira/browse/FLINK-18858">FLINK-18858</a>)</strong></p>
<p>From Flink 1.12, Amazon Kinesis Data Streams (KDS) is natively supported as a source/sink also in the Table API/SQL. The new Kinesis SQL connector ships with support for Enhanced Fan-Out (EFO) and Sink Partitioning. For a complete overview of supported features, configuration options and exposed metadata, check the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/kinesis.html">updated documentation</a>.</p>
<p><strong>Streaming Sink Compaction in the FileSystem/Hive Connector (<a href="https://issues.apache.org/jira/browse/FLINK-19345">FLINK-19345</a>)</strong></p>
<p>Many bulk formats, such as Parquet, are most efficient when written as large files; this is a challenge when frequent checkpointing is enabled, as too many small files are created (and need to be rolled on checkpoint). In Flink 1.12, the file sink supports <strong>file compaction</strong>, allowing jobs to retain smaller checkpoint intervals without generating a large number of files. To enable file compaction, you can set <code>auto-compaction=true</code> in the properties of the FileSystem connector, as described in the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/filesystem.html#file-compaction">documentation</a>.</p>
<p><strong>Watermark Pushdown in the Kafka Connector (<a href="https://issues.apache.org/jira/browse/FLINK-20041">FLINK-20041</a>)</strong></p>
<p>To ensure correctness when consuming from Kafka, it’s generally preferable to generate watermarks on a per-partition basis, since the out-of-orderness within a partition is usually lower than across all partitions. Flink will now push down watermark strategies to <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/kafka.html#source-per-partition-watermarks">emit <strong>per-partition watermarks</strong></a> from within the Kafka consumer. The output watermark of the source will be determined by the minimum watermark across the partitions it reads, leading to better (i.e. closer to real-time) watermarking. Watermark pushdown also lets you configure per-partition <strong>idleness detection</strong> to prevent idle partitions from holding back the event time progress of the entire application.</p>
<p><strong>Newly Supported Formats</strong></p>
<table class="table table-bordered" style="font-size:95%">
  <thead>
    <tr>
      <th>Format</th>
      <th>Description</th>
      <th>Supported Connectors</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td><a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html">Avro Schema Registry</a></td>
      <td>Read and write data serialized with the Confluent Schema Registry <a href="https://docs.confluent.io/platform/current/schema-registry/serdes-develop/serdes-avro.html">KafkaAvroSerializer</a>.</td>
      <td>Kafka, Upsert Kafka</td>
    </tr>
    <tr>
      <td><a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/formats/debezium.html">Debezium Avro</a></td>
      <td>Read and write Debezium records serialized with the Confluent Schema Registry KafkaAvroSerializer.</td>
      <td>Kafka</td>
    </tr>
    <tr>
      <td><a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/formats/maxwell.html">Maxwell (CDC)</a></td>
      <td>Read and write Maxwell JSON records.</td>
      <td>
        <p>Kafka</p>
        <p>FileSystem</p>
      </td>
    </tr>
      <td><a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/connectors/formats/raw.html">Raw</a></td>
      <td>Read and write raw (byte-based) values as a single column.</td>
      <td>
        <p>Kafka, Upsert Kafka</p>
        <p>Kinesis</p>
        <p>FileSystem</p>
      </td>
    </tr>
  </tbody>
</table>
<p><strong>Multi-input Operator for Join Optimization (<a href="https://issues.apache.org/jira/browse/FLINK-19621">FLINK-19621</a>)</strong></p>
<p>To eliminate unnecessary serialization and data spilling and improve the performance of batch and streaming Table API/SQL jobs, the default planner now leverages the N-ary stream operator introduced in the last release (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-92%3A&#43;Add&#43;N-Ary&#43;Stream&#43;Operator&#43;in&#43;Flink">FLIP-92</a>) to implement the &ldquo;chaining&rdquo; of operators connected by forward edges.</p>
<p><strong>Type Inference for Table API UDAFs (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-65%3A&#43;New&#43;type&#43;inference&#43;for&#43;Table&#43;API&#43;UDFs">FLIP-65</a>)</strong></p>
<p>This release concluded the work started in Flink 1.9 on a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/table/types.html#data-types">new data type system</a> for the Table API, with the exposure of aggregate functions (UDAFs) to the new type system. From Flink 1.12, UDAFs behave similarly to scalar and table functions, and support all data types.</p>
<hr>
<h3 id="pyflink-python-datastream-api">
  PyFlink: Python DataStream API
  <a class="anchor" href="#pyflink-python-datastream-api">#</a>
</h3>
<p>To expand the usability of PyFlink, this release introduces a first version of the Python DataStream API (<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866298">FLIP-130</a>) with support for stateless operations (e.g. Map, FlatMap, Filter, KeyBy).</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-python" data-lang="python"><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.common.typeinfo</span> <span class="kn">import</span> <span class="n">Types</span>
</span></span><span class="line"><span class="cl"><span class="kn">from</span> <span class="nn">pyflink.datastream</span> <span class="kn">import</span> <span class="n">MapFunction</span><span class="p">,</span> <span class="n">StreamExecutionEnvironment</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="k">class</span> <span class="nc">MyMapFunction</span><span class="p">(</span><span class="n">MapFunction</span><span class="p">):</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl">    <span class="k">def</span> <span class="nf">map</span><span class="p">(</span><span class="bp">self</span><span class="p">,</span> <span class="n">value</span><span class="p">):</span>
</span></span><span class="line"><span class="cl">        <span class="k">return</span> <span class="n">value</span> <span class="o">+</span> <span class="mi">1</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="n">get_execution_environment</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"><span class="n">data_stream</span> <span class="o">=</span> <span class="n">env</span><span class="o">.</span><span class="n">from_collection</span><span class="p">([</span><span class="mi">1</span><span class="p">,</span> <span class="mi">2</span><span class="p">,</span> <span class="mi">3</span><span class="p">,</span> <span class="mi">4</span><span class="p">,</span> <span class="mi">5</span><span class="p">],</span> <span class="n">type_info</span><span class="o">=</span><span class="n">Types</span><span class="o">.</span><span class="n">INT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"><span class="n">mapped_stream</span> <span class="o">=</span> <span class="n">data_stream</span><span class="o">.</span><span class="n">map</span><span class="p">(</span><span class="n">MyMapFunction</span><span class="p">(),</span> <span class="n">output_type</span><span class="o">=</span><span class="n">Types</span><span class="o">.</span><span class="n">INT</span><span class="p">())</span>
</span></span><span class="line"><span class="cl"><span class="n">mapped_stream</span><span class="o">.</span><span class="n">print</span><span class="p">()</span>
</span></span><span class="line"><span class="cl"><span class="n">env</span><span class="o">.</span><span class="n">execute</span><span class="p">(</span><span class="s2">&#34;datastream job&#34;</span><span class="p">)</span>
</span></span></code></pre></div><p>To give the Python DataStream API a try, you can <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/python/installation.html#installation-of-pyflink">install PyFlink</a> and check out <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/python/datastream_tutorial.html">this tutorial</a> that guides you through building a simple streaming application.</p>
<hr>
<h3 id="other-improvements-to-pyflink">
  Other Improvements to PyFlink
  <a class="anchor" href="#other-improvements-to-pyflink">#</a>
</h3>
<p><strong>PyFlink Jobs on Kubernetes (<a href="https://issues.apache.org/jira/browse/FLINK-17480">FLINK-17480</a>)</strong></p>
<p>In addition to standalone and YARN deployments, PyFlink jobs can now also be deployed natively on Kubernetes. The <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/deployment/resource-providers/native_kubernetes.html">deployment documentation</a> has detailed instructions on how to start a <em>session</em> or <em>application</em> cluster on Kubernetes.</p>
<p><strong>User-defined Aggregate Functions (UDAFs)</strong></p>
<p>From Flink 1.12, you can define and register UDAFs in PyFlink (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-139%3A&#43;General&#43;Python&#43;User-Defined&#43;Aggregate&#43;Function&#43;Support&#43;on&#43;Table&#43;API">FLIP-139</a>). In contrast to a normal UDF, which doesn’t handle state and operates on a single row at a time, a <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#aggregate-functions">UDAF</a> is stateful and can be used to compute custom aggregations over multiple input rows. To benefit from vectorization, you can also use <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/vectorized_python_udfs.html#vectorized-aggregate-functions">Pandas UDAFs</a> (<a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-137%3A&#43;Support&#43;Pandas&#43;UDAF&#43;in&#43;PyFlink?src=jira">FLIP-137</a>) (up to 10x faster).</p>
<div class="alert alert-info small" markdown="1">
<b>Note:</b> General UDAFs are only supported for group aggregations and in _streaming_ mode. For _batch_ mode or window aggregations, use Pandas UDAFs.
</div>
<hr>
<h2 id="important-changes">
  Important Changes
  <a class="anchor" href="#important-changes">#</a>
</h2>
<ul>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-19319">FLINK-19319</a>] The default stream time characteristic has been changed to <code>EventTime</code>, so you no longer need to call <code>StreamExecutionEnvironment.setStreamTimeCharacteristic()</code> to enable event time support.</p>
</li>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-19278">FLINK-19278</a>] Flink now relies on Scala Macros 2.1.1, so Scala versions &lt; 2.11.11 are no longer supported.</p>
</li>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-19152">FLINK-19152</a>] The Kafka 0.10.x and 0.11.x connectors have been removed with this release. If you’re still using these versions, please refer to the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/dev/connectors/kafka.html">documentation</a> to learn how to upgrade to the universal Kafka connector.</p>
</li>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-18795">FLINK-18795</a>] The HBase connector has been upgraded to the last stable version (2.2.3).</p>
</li>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-18795">FLINK-17877</a>] PyFlink now supports Python 3.8.</p>
</li>
<li>
<p>[<a href="https://issues.apache.org/jira/browse/FLINK-18738">FLINK-18738</a>] To align with <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-53%3A&#43;Fine&#43;Grained&#43;Operator&#43;Resource&#43;Management">FLIP-53</a>, managed memory is now the default also for Python workers. The configurations <code>python.fn-execution.buffer.memory.size</code> and <code>python.fn-execution.framework.memory.size</code> have been removed and will not take effect anymore.</p>
</li>
</ul>
<h2 id="release-notes">
  Release Notes
  <a class="anchor" href="#release-notes">#</a>
</h2>
<p>Please review the <a href="//nightlies.apache.org/flink/flink-docs-release-1.12/release-notes/flink-1.12.html">release notes</a> carefully for a detailed list of changes and new features if you plan to upgrade your setup to Flink 1.12. This version is API-compatible with previous 1.x releases for APIs annotated with the @Public annotation.</p>
<h2 id="list-of-contributors">
  List of Contributors
  <a class="anchor" href="#list-of-contributors">#</a>
</h2>
<p>The Apache Flink community would like to thank each and every one of the 300 contributors that have made this release possible:</p>
<p>Abhijit Shandilya, Aditya Agarwal, Alan Su, Alexander Alexandrov, Alexander Fedulov, Alexey Trenikhin, Aljoscha Krettek, Allen Madsen, Andrei Bulgakov, Andrey Zagrebin, Arvid Heise, Authuir, Bairos, Bartosz Krasinski, Benchao Li, Brandon, Brian Zhou, C08061, Canbin Zheng, Cedric Chen, Chesnay Schepler, Chris Nix, Congxian Qiu, DG-Wangtao, Da(Dash)Shen, Dan Hill, Daniel Magyar, Danish Amjad, Danny Chan, Danny Cranmer, David Anderson, Dawid Wysakowicz, Devin Thomson, Dian Fu, Dongxu Wang, Dylan Forciea, Echo Lee, Etienne Chauchot, Fabian Paul, Felipe Lolas, Fin-Chan, Fin-chan, Flavio Pompermaier, Flora Tao, Fokko Driesprong, Gao Yun, Gary Yao, Ghildiyal, GitHub, Grebennikov Roman, GuoWei Ma, Gyula Fora, Hequn Cheng, Herman, Hong Teoh, HuangXiao, HuangXingBo, Husky Zeng, Hyeonseop Lee, I. Raleigh, Ivan, Jacky Lau, Jark Wu, Jaskaran Bindra, Jeff Yang, Jeff Zhang, Jiangjie (Becket) Qin, Jiatao Tao, Jiayi Liao, Jiayi-Liao, Jiezhi.G, Jimmy.Zhou, Jindrich Vimr, Jingsong Lee, JingsongLi, Joey Echeverria, Juha Mynttinen, Jun Qin, Jörn Kottmann, Karim Mansour, Kevin Bohinski, Kezhu Wang, Konstantin Knauf, Kostas Kloudas, Kurt Young, Lee Do-Kyeong, Leonard Xu, Lijie Wang, Liu Jiangang, Lorenzo Nicora, LululuAlu, Luxios22, Marta Paes Moreira, Mateusz Sabat, Matthias Pohl, Maximilian Michels, Miklos Gergely, Milan Nikl, Nico Kruber, Niel Hu, Niels Basjes, Oleksandr Nitavskyi, Paul Lam, Peng, PengFei Li, PengchengLiu, Peter Huang, Piotr Nowojski, PoojaChandak, Qingsheng Ren, Qishang Zhong, Richard Deurwaarder, Richard Moorhead, Robert Metzger, Roc Marshal, Roey Shem Tov, Roman, Roman Khachatryan, Rong Rong, Rui Li, Seth Wiesman, Shawn Huang, ShawnHx, Shengkai, Shuiqiang Chen, Shuo Cheng, SteNicholas, Stephan Ewen, Steve Whelan, Steven Wu, Tartarus0zm, Terry Wang, Thesharing, Thomas Weise, Till Rohrmann, Timo Walther, TsReaper, Tzu-Li (Gordon) Tai, Ufuk Celebi, V1ncentzzZ, Vladimirs Kotovs, Wei Zhong, Weike DONG, XBaith, Xiaogang Zhou, Xiaoguang Sun, Xingcan Cui, Xintong Song, Xuannan, Yang Liu, Yangze Guo, Yichao Yang, Yikun Jiang, Yu Li, Yuan Mei, Yubin Li, Yun Gao, Yun Tang, Yun Wang, Zhenhua Yang, Zhijiang, Zhu Zhu, acesine, acqua.csq, austin ce, bigdata-ny, billyrrr, caozhen, caozhen1937, chaojianok, chenkai, chris, cpugputpu, dalong01.liu, darionyaphet, dijie, diohabara, dufeng1010, fangliang, felixzheng, gkrishna, gm7y8, godfrey he, godfreyhe, gsralex, haseeb1431, hequn.chq, hequn8128, houmaozheng, huangxiao, huangxingbo, huzekang, jPrest, jasonlee, jinfeng, jinhai, johnm, jxeditor, kecheng, kevin.cyj, kevinzwx, klion26, leiqiang, libenchao, lijiewang.wlj, liufangliang, liujiangang, liuyongvs, liuyufei9527, lsy, lzy3261944, mans2singh, molsionmo, openopen2, pengweibo, rinkako, <a href="mailto:sanshi@wwdz.onaliyun.com">sanshi@wwdz.onaliyun.com</a>, secondChoice, seunjjs, shaokan.cao, shizhengchao, shizk233, shouweikun, spurthi chaganti, sujun, sunjincheng121, sxnan, tison, totorooo, venn, vthinkxie, wangsong2, wangtong, wangxiyuan, wangxlong, wangyang0918, wangzzu, weizheng92, whlwanghailong, wineandcheeze, wooplevip, wtog, wudi28, wxp, xcomp, xiaoHoly, xiaolong.wang, yangyichao-mango, yingshin, yushengnan, yushujun, yuzhao.cyz, zhangap, zhangmang, zhangzhanchum, zhangzhanchun, zhangzhanhua, zhangzp, zheyu, zhijiang, zhushang, zhuxiaoshang, zlzhang0122, zodo, zoudan, zouzhiye</p>
</p>
</article>

          



  
    
    <div class="edit-this-page">
      <p>
        <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
      </p>
      <p>
        <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-12-10-release-1.12.0.md">
          Edit This Page<i class="fa fa-edit fa-fw"></i> 
        </a>
      </p>
    </div>

        </section>
        
          <aside class="book-toc">
            


<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
  <ul>
    <li>
      <ul>
        <li><a href="#new-features-and-improvements">New Features and Improvements</a>
          <ul>
            <li><a href="#batch-execution-mode-in-the-datastream-api">Batch Execution Mode in the DataStream API</a></li>
            <li><a href="#new-data-sink-api-beta">New Data Sink API (Beta)</a></li>
            <li><a href="#kubernetes-high-availability-ha-service">Kubernetes High Availability (HA) Service</a></li>
            <li><a href="#other-improvements">Other Improvements</a></li>
            <li><a href="#table-apisql-metadata-handling-in-sql-connectors">Table API/SQL: Metadata Handling in SQL Connectors</a></li>
            <li><a href="#table-apisql-upsert-kafka-connector">Table API/SQL: Upsert Kafka Connector</a></li>
            <li><a href="#table-apisql-support-for-temporal-table-joins-in-sql">Table API/SQL: Support for Temporal Table Joins in SQL</a></li>
            <li><a href="#other-improvements-to-the-table-apisql">Other Improvements to the Table API/SQL</a></li>
            <li><a href="#pyflink-python-datastream-api">PyFlink: Python DataStream API</a></li>
            <li><a href="#other-improvements-to-pyflink">Other Improvements to PyFlink</a></li>
          </ul>
        </li>
        <li><a href="#important-changes">Important Changes</a></li>
        <li><a href="#release-notes">Release Notes</a></li>
        <li><a href="#list-of-contributors">List of Contributors</a></li>
      </ul>
    </li>
  </ul>
</nav>


          </aside>
          <aside class="expand-toc hidden">
            <a class="toc" onclick="expandToc()" href="javascript:void(0)">
              <i class="fa fa-bars" aria-hidden="true"></i>
            </a>
          </aside>
        
      </main>

      <footer>
        


<div class="separator"></div>
<div class="panels">
  <div class="wrapper">
      <div class="panel">
        <ul>
          <li>
            <a href="https://flink-packages.org/">flink-packages.org</a>
          </li>
          <li>
            <a href="https://www.apache.org/">Apache Software Foundation</a>
          </li>
          <li>
            <a href="https://www.apache.org/licenses/">License</a>
          </li>
          
          
          
            
          
            
          
          

          
            
              
            
          
            
              
                <li>
                  <a  href="/zh/">
                    <i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
                  </a>
                </li>
              
            
          
       </ul>
      </div>
      <div class="panel">
        <ul>
          <li>
            <a href="/what-is-flink/security">Security</a-->
          </li>
          <li>
            <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
          </li>
          <li>
            <a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
          </li>
       </ul>
      </div>
      <div class="panel icons">
        <div>
          <a href="/posts">
            <div class="icon flink-blog-icon"></div>
            <span>Flink blog</span>
          </a>
        </div>
        <div>
          <a href="https://github.com/apache/flink">
            <div class="icon flink-github-icon"></div>
            <span>Github</span>
          </a>
        </div>
        <div>
          <a href="https://twitter.com/apacheflink">
            <div class="icon flink-twitter-icon"></div>
            <span>Twitter</span>
          </a>
        </div>
      </div>
  </div>
</div>

<hr/>

<div class="container disclaimer">
  <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>



      </footer>
    
  </body>
</html>






