
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>

<head>
  


<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2022/01/20/pravega-flink-connector-101/">

  <meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Pravega, which is now a CNCF sandbox project, is a cloud-native storage system based on abstractions for both batch and streaming data consumption. Pravega streams (a new storage abstraction) are durable, consistent, and elastic, while natively supporting long-term data retention. In comparison, Apache Flink is a popular real-time computing engine that provides unified batch and stream processing. Flink provides high-throughput, low-latency computation, as well as support for complex event processing and state management.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Pravega Flink Connector 101" />
<meta property="og:description" content="Pravega, which is now a CNCF sandbox project, is a cloud-native storage system based on abstractions for both batch and streaming data consumption. Pravega streams (a new storage abstraction) are durable, consistent, and elastic, while natively supporting long-term data retention. In comparison, Apache Flink is a popular real-time computing engine that provides unified batch and stream processing. Flink provides high-throughput, low-latency computation, as well as support for complex event processing and state management." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2022/01/20/pravega-flink-connector-101/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2022-01-20T00:00:00+00:00" />
<meta property="article:modified_time" content="2022-01-20T00:00:00+00:00" />
<title>Pravega Flink Connector 101 | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->

  <meta name="generator" content="Hugo 0.124.1">

    
    <script>
      var _paq = window._paq = window._paq || [];
       
       
      _paq.push(['disableCookies']);
       
      _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
      _paq.push(['trackPageView']);
      _paq.push(['enableLinkTracking']);
      (function() {
        var u="//analytics.apache.org/";
        _paq.push(['setTrackerUrl', u+'matomo.php']);
        _paq.push(['setSiteId', '1']);
        var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
        g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
      })();
    </script>
    
</head>

<body dir=ZgotmplZ>
  


<header>
  <nav class="navbar navbar-expand-xl">
    <div class="container-fluid">
      <a class="navbar-brand" href="/">
        <img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
        <span>Apache Flink</span>
      </a>
      <button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
          <i class="fa fa-bars navbar-toggler-icon"></i>
      </button>
      <div class="collapse navbar-collapse" id="navbarSupportedContent">
        <ul class="navbar-nav">
          





    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/security/">Security</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
    </a>
  

          </li>
        
      </ul>
    </li>
  

    
      
  
    <li class="nav-item dropdown">
      <a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
      <ul class="dropdown-menu">
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
  

          </li>
        
          <li>
            
  
    <a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
  

          </li>
        
      </ul>
    </li>
  

    


    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/posts/">Flink Blog</a>
  

    </li>
  

    
      
  
    <li class="nav-item">
      
  
    <a class="nav-link" href="/downloads/">Downloads</a>
  

    </li>
  

    


    









        </ul>
        <div class="book-search">
          <div class="book-search-spinner hidden">
            <i class="fa fa-refresh fa-spin"></i>
          </div>
          <form class="search-bar d-flex" onsubmit="return false;"su>
            <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
            <i class="fa fa-search search"></i>
            <i class="fa fa-circle-o-notch fa-spin spinner"></i>
          </form>
          <div class="book-search-spinner hidden"></div>
          <ul id="book-search-results"></ul>
        </div>
      </div>
    </div>
  </nav>
  <div class="navbar-clearfix"></div>
</header>
 
  
      <main class="flex">
        <section class="container book-page">
          
<article class="markdown">
    <h1>
        <a href="/2022/01/20/pravega-flink-connector-101/">Pravega Flink Connector 101</a>
    </h1>
    


  January 20, 2022 -



  Yumin Zhou (Brian)

  <a href="https://twitter.com/crazy__zhou">(@crazy__zhou)</a>
  



    <p><p><a href="https://cncf.pravega.io/">Pravega</a>, which is now a CNCF sandbox project, is a cloud-native storage system based on abstractions for both batch and streaming data consumption. Pravega streams (a new storage abstraction) are durable, consistent, and elastic, while natively supporting long-term data retention. In comparison, <a href="https://flink.apache.org/">Apache Flink</a> is a popular real-time computing engine that provides unified batch and stream processing. Flink provides high-throughput, low-latency computation, as well as support for complex event processing and state management. Both Pravega and Flink share the same design philosophy and treat data streams as primitives. This makes them a great match when constructing storage+computing data pipelines which can unify batch and streaming use cases.</p>
<p>That&rsquo;s also the main reason why Pravega has chosen to use Flink as the first integrated execution engine among the various distributed computing engines on the market. With the help of Flink, users can use flexible APIs for windowing, complex event processing (CEP), or table abstractions to process streaming data easily and enrich the data being stored. Since its inception in 2016, Pravega has established communication with Flink PMC members and developed the connector together.</p>
<p>In 2017, the Pravega Flink connector module started to move out of the Pravega main repository and has been maintained in a new separate <a href="https://github.com/pravega/flink-connectors">repository</a> since then. During years of development, many features have been implemented, including:</p>
<ul>
<li>exactly-once processing guarantees for both Reader and Writer, supporting end-to-end exactly-once processing pipelines</li>
<li>seamless integration with Flink&rsquo;s checkpoints and savepoints</li>
<li>parallel Readers and Writers supporting high throughput and low latency processing</li>
<li>support for Batch, Streaming, and Table API to access Pravega Streams</li>
</ul>
<p>These key features make streaming pipeline applications easier to develop without worrying about performance and correctness which are the common pain points for many streaming use cases.</p>
<p>In this blog post, we will discuss how to use this connector to read and write Pravega streams with the Flink DataStream API.</p>
<h1 id="basic-usages">
  Basic usages
  <a class="anchor" href="#basic-usages">#</a>
</h1>
<h2 id="dependency">
  Dependency
  <a class="anchor" href="#dependency">#</a>
</h2>
<p>To use this connector in your application, add the dependency to your project:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-xml" data-lang="xml"><span class="line"><span class="cl"><span class="nt">&lt;dependency&gt;</span>
</span></span><span class="line"><span class="cl">  <span class="nt">&lt;groupId&gt;</span>io.pravega<span class="nt">&lt;/groupId&gt;</span>
</span></span><span class="line"><span class="cl">  <span class="nt">&lt;artifactId&gt;</span>pravega-connectors-flink-1.13_2.12<span class="nt">&lt;/artifactId&gt;</span>
</span></span><span class="line"><span class="cl">  <span class="nt">&lt;version&gt;</span>0.10.1<span class="nt">&lt;/version&gt;</span>
</span></span><span class="line"><span class="cl"><span class="nt">&lt;/dependency&gt;</span>
</span></span></code></pre></div><p>In the above example,</p>
<p><code>1.13</code> is the Flink major version which is put in the middle of the artifact name. The Pravega Flink connector maintains compatibility for the <em>three</em> most recent major versions of Flink.</p>
<p><code>0.10.1</code> is the version that aligns with the Pravega version.</p>
<p>You can find the latest release with a support matrix on the <a href="https://github.com/pravega/flink-connectors/releases">GitHub Releases page</a>.</p>
<h2 id="api-introduction">
  API introduction
  <a class="anchor" href="#api-introduction">#</a>
</h2>
<h3 id="configurations">
  Configurations
  <a class="anchor" href="#configurations">#</a>
</h3>
<p>The connector provides a common top-level object <code>PravegaConfig</code> for Pravega connection configurations. The config object automatically configures itself from <em>environment variables</em>, <em>system properties</em> and <em>program arguments</em>.</p>
<p>The basic controller URI and the default scope can be set like this:</p>
<table>
<thead>
<tr>
<th>Setting</th>
<th>Environment Variable /<br/>System Property /<br/>Program Argument</th>
<th>Default Value</th>
</tr>
</thead>
<tbody>
<tr>
<td>Controller URI</td>
<td><code>PRAVEGA_CONTROLLER_URI</code><br/><code>pravega.controller.uri</code><br/><code>--controller</code></td>
<td><code>tcp://localhost:9090</code></td>
</tr>
<tr>
<td>Default Scope</td>
<td><code>PRAVEGA_SCOPE</code><br/><code>pravega.scope</code><br/><code>--scope</code></td>
<td>-</td>
</tr>
</tbody>
</table>
<p>The recommended way to create an instance of <code>PravegaConfig</code> is the following:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// From default environment</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaConfig</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">PravegaConfig</span><span class="p">.</span><span class="na">fromDefaults</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// From program arguments</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ParameterTool</span><span class="w"> </span><span class="n">params</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ParameterTool</span><span class="p">.</span><span class="na">fromArgs</span><span class="p">(</span><span class="n">args</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaConfig</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">PravegaConfig</span><span class="p">.</span><span class="na">fromParams</span><span class="p">(</span><span class="n">params</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// From user specification</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaConfig</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">PravegaConfig</span><span class="p">.</span><span class="na">fromDefaults</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withControllerURI</span><span class="p">(</span><span class="s">&#34;tcp://...&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withDefaultScope</span><span class="p">(</span><span class="s">&#34;SCOPE-NAME&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withCredentials</span><span class="p">(</span><span class="n">credentials</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withHostnameValidation</span><span class="p">(</span><span class="kc">false</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><h3 id="serializationdeserialization">
  Serialization/Deserialization
  <a class="anchor" href="#serializationdeserialization">#</a>
</h3>
<p>Pravega has defined <a href="http://pravega.io/docs/latest/javadoc/clients/io/pravega/client/stream/Serializer.html"><code>io.pravega.client.stream.Serializer</code></a> for the serialization/deserialization, while Flink has also defined standard interfaces for the purpose.</p>
<ul>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/SerializationSchema.html"><code>org.apache.flink.api.common.serialization.SerializationSchema</code></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/api/common/serialization/DeserializationSchema.html"><code>org.apache.flink.api.common.serialization.DeserializationSchema</code></a></li>
</ul>
<p>For interoperability with other pravega client applications, we have built-in adapters <code>PravegaSerializationSchema</code> and <code>PravegaDeserializationSchema</code> to support processing Pravega stream data produced by a non-Flink application.</p>
<p>Here is the adapter for Pravega Java serializer:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">io.pravega.client.stream.impl.JavaSerializer</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DeserializationSchema</span><span class="o">&lt;</span><span class="n">MyEvent</span><span class="o">&gt;</span><span class="w"> </span><span class="n">adapter</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">PravegaDeserializationSchema</span><span class="o">&lt;&gt;</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="n">MyEvent</span><span class="p">.</span><span class="na">class</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">JavaSerializer</span><span class="o">&lt;</span><span class="n">MyEvent</span><span class="o">&gt;</span><span class="p">());</span><span class="w">
</span></span></span></code></pre></div><h3 id="flinkpravegareader">
  <code>FlinkPravegaReader</code>
  <a class="anchor" href="#flinkpravegareader">#</a>
</h3>
<p><code>FlinkPravegaReader</code> is a Flink <code>SourceFunction</code> implementation which supports parallel reads from one or more Pravega streams. Internally, it initiates a Pravega reader group and creates Pravega <code>EventStreamReader</code> instances to read the data from the stream(s). It provides a builder-style API to construct, and can allow streamcuts to mark the start and end of the read.</p>
<p>You can use it like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">final</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Enable Flink checkpoint to make state fault tolerant</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">enableCheckpointing</span><span class="p">(</span><span class="n">60000</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the Pravega configuration</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">ParameterTool</span><span class="w"> </span><span class="n">params</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ParameterTool</span><span class="p">.</span><span class="na">fromArgs</span><span class="p">(</span><span class="n">args</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaConfig</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">PravegaConfig</span><span class="p">.</span><span class="na">fromParams</span><span class="p">(</span><span class="n">params</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the event deserializer</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DeserializationSchema</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">deserializer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the data stream</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkPravegaReader</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">pravegaSource</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FlinkPravegaReader</span><span class="p">.</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="n">builder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">forStream</span><span class="p">(...)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withPravegaConfig</span><span class="p">(</span><span class="n">config</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">withDeserializationSchema</span><span class="p">(</span><span class="n">deserializer</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">build</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="n">pravegaSource</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">setParallelism</span><span class="p">(</span><span class="n">4</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;pravega-source&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><h3 id="flinkpravegawriter">
  <code>FlinkPravegaWriter</code>
  <a class="anchor" href="#flinkpravegawriter">#</a>
</h3>
<p><code>FlinkPravegaWriter</code> is a Flink <code>SinkFunction</code> implementation which supports parallel writes to Pravega streams.</p>
<p>It supports three writer modes that relate to guarantees about the persistence of events emitted by the sink to a Pravega Stream:</p>
<ol>
<li><strong>Best-effort</strong> - Any write failures will be ignored and there could be data loss.</li>
<li><strong>At-least-once</strong>(default) - All events are persisted in Pravega. Duplicate events are possible, due to retries or in case of failure and subsequent recovery.</li>
<li><strong>Exactly-once</strong> - All events are persisted in Pravega using a transactional approach integrated with the Flink checkpointing feature.</li>
</ol>
<p>Internally, it will initiate several Pravega <code>EventStreamWriter</code> or <code>TransactionalEventStreamWriter</code> (depends on the writer mode) instances to write data to the stream. It provides a builder-style API to construct.</p>
<p>A basic usage looks like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the Pravega configuration</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaConfig</span><span class="w"> </span><span class="n">config</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">PravegaConfig</span><span class="p">.</span><span class="na">fromParams</span><span class="p">(</span><span class="n">params</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the event serializer</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">SerializationSchema</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">serializer</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the event router for selecting the Routing Key</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">PravegaEventRouter</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">router</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// Define the sink function</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkPravegaWriter</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">pravegaSink</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FlinkPravegaWriter</span><span class="p">.</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="n">builder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">forStream</span><span class="p">(...)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">withPravegaConfig</span><span class="p">(</span><span class="n">config</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">withSerializationSchema</span><span class="p">(</span><span class="n">serializer</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">withEventRouter</span><span class="p">(</span><span class="n">router</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">withWriterMode</span><span class="p">(</span><span class="n">EXACTLY_ONCE</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">   </span><span class="p">.</span><span class="na">build</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">MyClass</span><span class="o">&gt;</span><span class="w"> </span><span class="n">stream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">stream</span><span class="p">.</span><span class="na">addSink</span><span class="p">(</span><span class="n">pravegaSink</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">setParallelism</span><span class="p">(</span><span class="n">4</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">    </span><span class="p">.</span><span class="na">uid</span><span class="p">(</span><span class="s">&#34;pravega-sink&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>You can see some more examples <a href="https://github.com/pravega/pravega-samples">here</a>.</p>
<h1 id="internals-of-reader-and-writer">
  Internals of reader and writer
  <a class="anchor" href="#internals-of-reader-and-writer">#</a>
</h1>
<h2 id="checkpoint-integration">
  Checkpoint integration
  <a class="anchor" href="#checkpoint-integration">#</a>
</h2>
<p>Flink has periodic checkpoints based on the Chandy-Lamport algorithm to make state in Flink fault-tolerant. By allowing state and the corresponding stream positions to be recovered, the application is given the same semantics as a failure-free execution.</p>
<p>Pravega also has its own Checkpoint concept which is to create a consistent &ldquo;point in time&rdquo; persistence of the state of each Reader in the Reader Group, by using a specialized Event (<em>Checkpoint Event</em>) to signal each Reader to preserve its state. Once a Checkpoint has been completed, the application can use the Checkpoint to reset all the Readers in the Reader Group to the known consistent state represented by the Checkpoint.</p>
<p>This means that our end-to-end recovery story is not like other messaging systems such as Kafka, which uses a more coupled method and persists its offset in the Flink task state and lets Flink do the coordination. Flink delegates the Pravega source recovery completely to the Pravega server and uses only a lightweight hook to connect. We collaborated with the Flink community and added a new interface <code>ExternallyInducedSource</code> (<a href="https://issues.apache.org/jira/browse/FLINK-6390">FLINK-6390</a>) to allow such external calls for checkpointing. The connector integrated this interface to guarantee exactly-once semantics during a failure recovery.</p>
<p>The checkpoint mechanism works as a two-step process:</p>
<ul>
<li>
<p>The <a href="https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html">master hook</a> handler from the JobManager initiates the <a href="https://ci.apache.org/projects/flink/flink-docs-stable/api/java/org/apache/flink/runtime/checkpoint/MasterTriggerRestoreHook.html#triggerCheckpoint-long-long-java.util.concurrent.Executor-"><code>triggerCheckpoint</code></a> request to the <code>ReaderCheckpointHook</code> that was registered with the JobManager during <code>FlinkPravegaReader</code> source initialization. The <code>ReaderCheckpointHook</code> handler notifies Pravega to checkpoint the current reader state. This is a non-blocking call that returns a <code>future</code> once Pravega readers are done with the checkpointing. Once the <code>future</code> completes, the Pravega checkpoint will be persisted in a &ldquo;master state&rdquo; of a Flink checkpoint.</p>
</li>
<li>
<p>A <code>Checkpoint</code> event will be sent by Pravega as part of the data stream flow and, upon receiving the event, the <code>FlinkPravegaReader</code> will initiate a <a href="https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/ExternallyInducedSource.java#L73"><code>triggerCheckpoint</code></a> request to effectively let Flink continue and complete the checkpoint process.</p>
</li>
</ul>
<h2 id="end-to-end-exactly-once-semantics">
  End-to-end exactly-once semantics
  <a class="anchor" href="#end-to-end-exactly-once-semantics">#</a>
</h2>
<p>In the early years of big data processing, results from real-time stream processing were always considered inaccurate/approximate/speculative. However, this correctness is extremely important for some use cases and in some industries such as finance.</p>
<p>This constraint stems mainly from two issues:</p>
<ul>
<li>unordered data source in event time</li>
<li>end-to-end exactly-once semantics guarantee</li>
</ul>
<p>During recent years of development, watermarking has been introduced as a tradeoff between correctness and latency, which is now considered a good solution for unordered data sources in event time.</p>
<p>The guarantee of end-to-end exactly-once semantics is more tricky. When we say “exactly-once semantics”, what we mean is that each incoming event affects the final results exactly once. Even in the event of a machine or software failure, there is no duplicate data and no data that goes unprocessed. This is quite difficult because of the demands of message acknowledgment and recovery during such fast processing and is also why some early distributed streaming engines like Storm(without Trident) chose to support &ldquo;at-least-once&rdquo; guarantees.</p>
<p>Flink is one of the first streaming systems that was able to provide exactly-once semantics due to its delicate <a href="https://www.ververica.com/blog/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink">checkpoint mechanism</a>. But to make it work end-to-end, the final stage needs to apply the semantic to external message system sinks that support commits and rollbacks.</p>
<p>To work around this problem, Pravega introduced <a href="https://cncf.pravega.io/docs/latest/transactions/">transactional writes</a>. A Pravega transaction allows an application to prepare a set of events that can be written &ldquo;all at once&rdquo; to a Stream. This allows an application to &ldquo;commit&rdquo; a bunch of events atomically. When writes are idempotent, it is possible to implement end-to-end exactly-once pipelines together with Flink.</p>
<p>To build such an end-to-end solution requires coordination between Flink and the Pravega sink, which is still challenging. A common approach for coordinating commits and rollbacks in a distributed system is the two-phase commit protocol. We used this protocol and, together with the Flink community, implemented the sink function in a two-phase commit way coordinated with Flink checkpoints.</p>
<p>The Flink community then extracted the common logic from the two-phase commit protocol and provided a general interface <code>TwoPhaseCommitSinkFunction</code> (<a href="https://issues.apache.org/jira/browse/FLINK-7210">FLINK-7210</a>) to make it possible to build end-to-end exactly-once applications with other message systems that have transaction support. This includes Apache Kafka versions 0.11 and above. There is an official Flink <a href="https://flink.apache.org/features/2018/03/01/end-to-end-exactly-once-apache-flink.html">blog post</a> that describes this feature in detail.</p>
<h1 id="summary">
  Summary
  <a class="anchor" href="#summary">#</a>
</h1>
<p>The Pravega Flink connector enables Pravega to connect to Flink and allows Pravega to act as a key data store in a streaming pipeline. Both projects share a common design philosophy and can integrate well with each other. Pravega has its own concept of checkpointing and has implemented transactional writes to support end-to-end exactly-once guarantees.</p>
<h1 id="future-plans">
  Future plans
  <a class="anchor" href="#future-plans">#</a>
</h1>
<p><code>FlinkPravegaInputFormat</code> and <code>FlinkPravegaOutputFormat</code> are now provided to support batch reads and writes in Flink, but these are under the legacy DataSet API. Since Flink is now making efforts to unify batch and streaming, it is improving its APIs and providing new interfaces for the <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A&#43;Refactor&#43;Source&#43;Interface">source</a> and <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-143%3A&#43;Unified&#43;Sink&#43;API">sink</a> APIs in the Flink 1.11 and 1.12 releases. We will continue to work with the Flink community and integrate with the new APIs.</p>
<p>We will also put more effort into SQL / Table API support in order to provide a better user experience since it is simpler to understand and even more powerful to use in some cases.</p>
<p><strong>Note:</strong> the original blog post can be found <a href="https://cncf.pravega.io/blog/2021/11/01/pravega-flink-connector-101/">here</a>.</p>
</p>
</article>

          



  
    
    <div class="edit-this-page">
      <p>
        <a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
      </p>
      <p>
        <a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2022-01-20-pravega-connector-101.md">
          Edit This Page<i class="fa fa-edit fa-fw"></i> 
        </a>
      </p>
    </div>

        </section>
        
          <aside class="book-toc">
            


<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
  <ul>
    <li><a href="#basic-usages">Basic usages</a>
      <ul>
        <li><a href="#dependency">Dependency</a></li>
        <li><a href="#api-introduction">API introduction</a>
          <ul>
            <li><a href="#configurations">Configurations</a></li>
            <li><a href="#serializationdeserialization">Serialization/Deserialization</a></li>
            <li><a href="#flinkpravegareader"><code>FlinkPravegaReader</code></a></li>
            <li><a href="#flinkpravegawriter"><code>FlinkPravegaWriter</code></a></li>
          </ul>
        </li>
      </ul>
    </li>
    <li><a href="#internals-of-reader-and-writer">Internals of reader and writer</a>
      <ul>
        <li><a href="#checkpoint-integration">Checkpoint integration</a></li>
        <li><a href="#end-to-end-exactly-once-semantics">End-to-end exactly-once semantics</a></li>
      </ul>
    </li>
    <li><a href="#summary">Summary</a></li>
    <li><a href="#future-plans">Future plans</a></li>
  </ul>
</nav>


          </aside>
          <aside class="expand-toc hidden">
            <a class="toc" onclick="expandToc()" href="javascript:void(0)">
              <i class="fa fa-bars" aria-hidden="true"></i>
            </a>
          </aside>
        
      </main>

      <footer>
        


<div class="separator"></div>
<div class="panels">
  <div class="wrapper">
      <div class="panel">
        <ul>
          <li>
            <a href="https://flink-packages.org/">flink-packages.org</a>
          </li>
          <li>
            <a href="https://www.apache.org/">Apache Software Foundation</a>
          </li>
          <li>
            <a href="https://www.apache.org/licenses/">License</a>
          </li>
          
          
          
            
          
            
          
          

          
            
              
            
          
            
              
                <li>
                  <a  href="/zh/">
                    <i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
                  </a>
                </li>
              
            
          
       </ul>
      </div>
      <div class="panel">
        <ul>
          <li>
            <a href="/what-is-flink/security">Security</a-->
          </li>
          <li>
            <a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
          </li>
          <li>
            <a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
          </li>
       </ul>
      </div>
      <div class="panel icons">
        <div>
          <a href="/posts">
            <div class="icon flink-blog-icon"></div>
            <span>Flink blog</span>
          </a>
        </div>
        <div>
          <a href="https://github.com/apache/flink">
            <div class="icon flink-github-icon"></div>
            <span>Github</span>
          </a>
        </div>
        <div>
          <a href="https://twitter.com/apacheflink">
            <div class="icon flink-twitter-icon"></div>
            <span>Twitter</span>
          </a>
        </div>
      </div>
  </div>
</div>

<hr/>

<div class="container disclaimer">
  <p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>



      </footer>
    
  </body>
</html>






