blob: 50451e140cca92e1102092e059a29a578b0de022 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Part one of this tutorial will teach you how to build and run a custom source connector to be used with Table API and SQL, two high-level abstractions in Flink. The tutorial comes with a bundled docker-compose setup that lets you easily run the connector. You can then try it out with Flink’s SQL client.
Introduction # Apache Flink is a data processing engine that aims to keep state locally in order to do computations efficiently.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Implementing a Custom Source Connector for Table API and SQL - Part One " />
<meta property="og:description" content="Part one of this tutorial will teach you how to build and run a custom source connector to be used with Table API and SQL, two high-level abstractions in Flink. The tutorial comes with a bundled docker-compose setup that lets you easily run the connector. You can then try it out with Flink’s SQL client.
Introduction # Apache Flink is a data processing engine that aims to keep state locally in order to do computations efficiently." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2021-09-07T00:00:00+00:00" />
<meta property="article:modified_time" content="2021-09-07T00:00:00+00:00" />
<title>Implementing a Custom Source Connector for Table API and SQL - Part One | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-one/">Implementing a Custom Source Connector for Table API and SQL - Part One </a>
</h1>
September 7, 2021 -
Ingo Buerk
Daisy Tsang
<p><p>Part one of this tutorial will teach you how to build and run a custom source connector to be used with Table API and SQL, two high-level abstractions in Flink. The tutorial comes with a bundled <a href="https://docs.docker.com/compose/">docker-compose</a> setup that lets you easily run the connector. You can then try it out with Flink’s SQL client.</p>
<h1 id="introduction">
Introduction
<a class="anchor" href="#introduction">#</a>
</h1>
<p>Apache Flink is a data processing engine that aims to keep <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/">state</a> locally in order to do computations efficiently. However, Flink does not &ldquo;own&rdquo; the data but relies on external systems to ingest and persist data. Connecting to external data input (<strong>sources</strong>) and external data storage (<strong>sinks</strong>) is usually summarized under the term <strong>connectors</strong> in Flink.</p>
<p>Since connectors are such important components, Flink ships with <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/">connectors for some popular systems</a>. But sometimes you may need to read in an uncommon data format and what Flink provides is not enough. This is why Flink also provides extension points for building custom connectors if you want to connect to a system that is not supported by an existing connector.</p>
<p>Once you have a source and a sink defined for Flink, you can use its declarative APIs (in the form of the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/overview/">Table API and SQL</a>) to execute queries for data analysis.</p>
<p>The <strong>Table API</strong> provides more programmatic access while <strong>SQL</strong> is a more universal query language. It is named Table API because of its relational functions on tables: how to obtain a table, how to output a table, and how to perform query operations on the table.</p>
<p>In this two-part tutorial, you will explore some of these APIs and concepts by implementing your own custom source connector for reading in data from an email inbox. You will then use Flink to process emails through the <a href="https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol">IMAP protocol</a>.</p>
<p>Part one will focus on building a custom source connector and <a href="/2021/09/07/connector-table-sql-api-part2">part two</a> will focus on integrating it.</p>
<h1 id="prerequisites">
Prerequisites
<a class="anchor" href="#prerequisites">#</a>
</h1>
<p>This tutorial assumes that you have some familiarity with Java and objected-oriented programming.</p>
<p>You are encouraged to follow along with the code in this <a href="https://github.com/Airblader/blog-imap">repository</a>.</p>
<p>It would also be useful to have <a href="https://docs.docker.com/compose/install/">docker-compose</a> installed on your system in order to use the script included in the repository that builds and runs the connector.</p>
<h1 id="understand-the-infrastructure-required-for-a-connector">
Understand the infrastructure required for a connector
<a class="anchor" href="#understand-the-infrastructure-required-for-a-connector">#</a>
</h1>
<p>In order to create a connector which works with Flink, you need:</p>
<ol>
<li>
<p>A <em>factory class</em> (a blueprint for creating other objects from string properties) that tells Flink with which identifier (in this case, “imap”) our connector can be addressed, which configuration options it exposes, and how the connector can be instantiated. Since Flink uses the Java Service Provider Interface (SPI) to discover factories located in different modules, you will also need to add some configuration details.</p>
</li>
<li>
<p>The <em>table source</em> object as a specific instance of the connector during the planning stage. It is responsible for back and forth communication with the optimizer during the planning stage and is like another factory for creating connector runtime implementation. There are also more advanced features, such as <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/abilities/package-summary.html">abilities</a>, that can be implemented to improve connector performance.</p>
</li>
<li>
<p>A <em>runtime implementation</em> from the connector obtained during the planning stage. The runtime logic is implemented in Flink&rsquo;s core connector interfaces and does the actual work of producing rows of dynamic table data. The runtime instances are shipped to the Flink cluster.</p>
</li>
</ol>
<p>Let us look at this sequence (factory class → table source → runtime implementation) in reverse order.</p>
<h1 id="establish-the-runtime-implementation-of-the-connector">
Establish the runtime implementation of the connector
<a class="anchor" href="#establish-the-runtime-implementation-of-the-connector">#</a>
</h1>
<p>You first need to have a source connector which can be used in Flink&rsquo;s runtime system, defining how data goes in and how it can be executed in the cluster. There are a few different interfaces available for implementing the actual source of the data and have it be discoverable in Flink.</p>
<p>For complex connectors, you may want to implement the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/api/connector/source/Source.html">Source interface</a> which gives you a lot of control. For simpler use cases, you can use the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html">SourceFunction interface</a>. There are already a few different implementations of SourceFunction interfaces for common use cases such as the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/FromElementsFunction.html">FromElementsFunction</a> class and the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/RichSourceFunction.html">RichSourceFunction</a> class. You will use the latter.</p>
<div class="note">
<h5>Hint</h5>
<p>The Source interface is the new abstraction whereas the SourceFunction interface is slowly phasing out.
All connectors will eventually implement the Source interface.
</p>
</div>
<p><code>RichSourceFunction</code> is a base class for implementing a data source that has access to context information and some lifecycle methods. There is a <code>run()</code> method inherited from the <code>SourceFunction</code> interface that you need to implement. It is invoked once and can be used to produce the data either once for a bounded result or within a loop for an unbounded stream.</p>
<p>For example, to create a bounded data source, you could implement this method so that it reads all existing emails and then closes. To create an unbounded source, you could only look at new emails coming in while the source is active. You can also combine these behaviors and expose them through configuration options.</p>
<p>When you first create the class and implement the interface, it should look something like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">run</span><span class="p">(</span><span class="n">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">cancel</span><span class="p">()</span><span class="w"> </span><span class="p">{}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Note that internal data structures (<code>RowData</code>) are used because that is required by the table runtime.</p>
<p>In the <code>run()</code> method, you get access to a <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.SourceContext.html">context</a> object inherited from the SourceFunction interface, which is a bridge to Flink and allows you to output data. Since the source does not produce any data yet, the next step is to make it produce some static data in order to test that the data flows correctly:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.GenericRowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.StringData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">run</span><span class="p">(</span><span class="n">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">GenericRowData</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">StringData</span><span class="p">.</span><span class="na">fromString</span><span class="p">(</span><span class="s">&#34;Subject 1&#34;</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">StringData</span><span class="p">.</span><span class="na">fromString</span><span class="p">(</span><span class="s">&#34;Hello, World!&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">cancel</span><span class="p">(){}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>You do not need to implement the <code>cancel()</code> method yet because the source finishes instantly.</p>
<h1 id="create-and-configure-a-dynamic-table-source-for-the-data-stream">
Create and configure a dynamic table source for the data stream
<a class="anchor" href="#create-and-configure-a-dynamic-table-source-for-the-data-stream">#</a>
</h1>
<p><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/concepts/dynamic_tables/">Dynamic tables</a> are the core concept of Flink’s Table API and SQL support for streaming data and, like its name suggests, change over time. You can imagine a data stream being logically converted into a table that is constantly changing. For this tutorial, the emails that will be read in will be interpreted as a (source) table that is queryable. It can be viewed as a specific instance of a connector class.</p>
<p>You will now implement a <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/DynamicTableSource.html">DynamicTableSource</a> interface. There are two types of dynamic table sources: <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/ScanTableSource.html">ScanTableSource</a> and <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/LookupTableSource.html">LookupTableSource</a>. Scan sources read the entire table on the external system while lookup sources look for specific rows based on keys. The former will fit the use case of this tutorial.</p>
<p>This is what a scan table source implementation would look like:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.ChangelogMode</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.DynamicTableSource</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.ScanTableSource</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.SourceFunctionProvider</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapTableSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">ScanTableSource</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">ChangelogMode</span><span class="w"> </span><span class="nf">getChangelogMode</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">ChangelogMode</span><span class="p">.</span><span class="na">insertOnly</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">ScanRuntimeProvider</span><span class="w"> </span><span class="nf">getScanRuntimeProvider</span><span class="p">(</span><span class="n">ScanContext</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="n">bounded</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">true</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ImapSource</span><span class="w"> </span><span class="n">source</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapSource</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">SourceFunctionProvider</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">source</span><span class="p">,</span><span class="w"> </span><span class="n">bounded</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">DynamicTableSource</span><span class="w"> </span><span class="nf">copy</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapTableSource</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">asSummaryString</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="s">&#34;IMAP Table Source&#34;</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/ChangelogMode.html">ChangelogMode</a> informs Flink of expected changes that the planner can expect during runtime. For example, whether the source produces only new rows, also updates to existing ones, or whether it can remove previously produced rows. Our source will only produce (<code>insertOnly()</code>) new rows.</p>
<p><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/ScanTableSource.ScanRuntimeProvider.html">ScanRuntimeProvider</a> allows Flink to create the actual runtime implementation you established previously (for reading the data). Flink even provides utilities like <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/SourceFunctionProvider.html">SourceFunctionProvider</a> to wrap it into an instance of <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/streaming/api/functions/source/SourceFunction.html">SourceFunction</a>, which is one of the base runtime interfaces.</p>
<p>You will also need to indicate whether the source is bounded or not. Currently, this is the case but you will have to change this later.</p>
<h1 id="create-a-factory-class-for-the-connector-so-it-can-be-discovered-by-flink">
Create a factory class for the connector so it can be discovered by Flink
<a class="anchor" href="#create-a-factory-class-for-the-connector-so-it-can-be-discovered-by-flink">#</a>
</h1>
<p>You now have a working source connector, but in order to use it in Table API or SQL, it needs to be discoverable by Flink. You also need to define how the connector is addressable from a SQL statement when creating a source table.</p>
<p>You need to implement a <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/factories/Factory.html">Factory</a>, which is a base interface that creates object instances from a list of key-value pairs in Flink&rsquo;s Table API and SQL. A factory is uniquely identified by its class name and <code>factoryIdentifier()</code>. For this tutorial, you will implement the more specific <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/factories/DynamicTableSourceFactory.html">DynamicTableSourceFactory</a>, which allows you to configure a dynamic table connector as well as create <code>DynamicTableSource</code> instances.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.HashSet</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Set</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.configuration.ConfigOption</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.DynamicTableSource</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.factories.DynamicTableSourceFactory</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.factories.FactoryUtil</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapTableSourceFactory</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">DynamicTableSourceFactory</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="nf">factoryIdentifier</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="s">&#34;imap&#34;</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">requiredOptions</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">HashSet</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">optionalOptions</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">HashSet</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">DynamicTableSource</span><span class="w"> </span><span class="nf">createDynamicTableSource</span><span class="p">(</span><span class="n">Context</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">FactoryUtil</span><span class="p">.</span><span class="na">TableFactoryHelper</span><span class="w"> </span><span class="n">factoryHelper</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FactoryUtil</span><span class="p">.</span><span class="na">createTableFactoryHelper</span><span class="p">(</span><span class="k">this</span><span class="p">,</span><span class="w"> </span><span class="n">ctx</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">validate</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapTableSource</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>There are currently no configuration options but they can be added and also validated within the <code>createDynamicTableSource()</code> function. There is a small helper utility, <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html">TableFactoryHelper</a>, that Flink offers which ensures that required options are set and that no unknown options are provided.</p>
<p>Finally, you need to register your factory for Java&rsquo;s Service Provider Interfaces (SPI). Classes that implement this interface can be discovered and should be added to this file <code>src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory</code> with the fully classified class name of your factory:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// if you created your class in the package org.example.acme, it should be named the following:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">org</span><span class="p">.</span><span class="na">example</span><span class="p">.</span><span class="na">acme</span><span class="p">.</span><span class="na">ImapTableSourceFactory</span><span class="w">
</span></span></span></code></pre></div><h1 id="test-the-custom-connector">
Test the custom connector
<a class="anchor" href="#test-the-custom-connector">#</a>
</h1>
<p>You should now have a working source connector. If you are following along with the provided repository, you can test it by running:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">cd</span> testing/
</span></span><span class="line"><span class="cl">$ ./build_and_run.sh
</span></span></code></pre></div><p>This builds the connector, starts a Flink cluster, a <a href="https://greenmail-mail-test.github.io/greenmail/">test email server</a> (which you will need later), and the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/">SQL client</a> (which is bundled in the regular Flink distribution) for you. If successful, you should see the SQL CLI:</p>
<div>
<img src="/img/blog/2021-09-07-connector-table-sql-api/flink-sql-client.png" alt="Flink SQL Client"/>
<p class="align-center">Flink SQL Client</p>
</div>
<p>You can now create a table (with a &ldquo;subject&rdquo; column and a &ldquo;content&rdquo; column) with your connector by executing the following statement with the SQL client:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">content</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>Note that the schema must be exactly as written since it is currently hardcoded into the connector.</p>
<p>You should be able to see the static data you provided in your source connector earlier, which would be &ldquo;Subject 1&rdquo; and &ldquo;Hello, World!&rdquo;.</p>
<p>Now that you have a working connector, the next step is to make it do something more useful than returning static data.</p>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>In this tutorial, you looked into the infrastructure required for a connector and configured its runtime implementation to define how it should be executed in a cluster. You also defined a dynamic table source that reads the entire stream-converted table from the external source, made the connector discoverable by Flink through creating a factory class for it, and then tested it.</p>
<h1 id="next-steps">
Next Steps
<a class="anchor" href="#next-steps">#</a>
</h1>
<p>In <a href="/2021/09/07/connector-table-sql-api-part2">part two</a>, you will integrate this connector with an email inbox through the IMAP protocol.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2021-09-07-connector-table-sql-api-part1.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#understand-the-infrastructure-required-for-a-connector">Understand the infrastructure required for a connector</a></li>
<li><a href="#establish-the-runtime-implementation-of-the-connector">Establish the runtime implementation of the connector</a></li>
<li><a href="#create-and-configure-a-dynamic-table-source-for-the-data-stream">Create and configure a dynamic table source for the data stream</a></li>
<li><a href="#create-a-factory-class-for-the-connector-so-it-can-be-discovered-by-flink">Create a factory class for the connector so it can be discovered by Flink</a></li>
<li><a href="#test-the-custom-connector">Test the custom connector</a></li>
<li><a href="#summary">Summary</a></li>
<li><a href="#next-steps">Next Steps</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>