blob: ca15d1277d0f4dc28f03ebaf22b25334eebc6166 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=>
<head>
<meta name="generator" content="Hugo 0.111.3">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="In part one of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.
Goals # Part two of the tutorial will teach you how to:
integrate a source connector which connects to a mailbox using the IMAP protocol use Jakarta Mail, a Java library that can send and receive email via the IMAP protocol write Flink SQL and execute the queries in the Ververica Platform for a nicer visualization You are encouraged to follow along with the code in this repository.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Implementing a custom source connector for Table API and SQL - Part Two " />
<meta property="og:description" content="In part one of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.
Goals # Part two of the tutorial will teach you how to:
integrate a source connector which connects to a mailbox using the IMAP protocol use Jakarta Mail, a Java library that can send and receive email via the IMAP protocol write Flink SQL and execute the queries in the Ververica Platform for a nicer visualization You are encouraged to follow along with the code in this repository." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2021-09-07T00:00:00+00:00" />
<meta property="article:modified_time" content="2021-09-07T00:00:00+00:00" />
<title>Implementing a custom source connector for Table API and SQL - Part Two | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.e3b33391dbc1f4b2cc47778e2f4b86c744ded3ccc82fdfb6f08caf91d8607f9a.css" integrity="sha256-47MzkdvB9LLMR3eOL0uGx0Te08zIL9&#43;28Iyvkdhgf5o=">
<script defer src="/en.search.min.8592fd2e43835d2ef6fab8eb9b8969ee6ad1bdb888a636e37e28032f8bd9887d.js" integrity="sha256-hZL9LkODXS72&#43;rjrm4lp7mrRvbiIpjbjfigDL4vZiH0="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=>
<input type="checkbox" class="hidden toggle" id="menu-control" />
<input type="checkbox" class="hidden toggle" id="toc-control" />
<main class="container flex">
<aside class="book-menu">
<nav>
<a id="logo" href="/">
<img width="70%" src="/flink-header-logo.svg">
</a>
<div class="book-search">
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/" />
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
<input type="checkbox" id="section-4117fb24454a2c30ee86e524839e77ec" class="toggle" />
<label for="section-4117fb24454a2c30ee86e524839e77ec" class="flex justify-between flink-menu-item">What is Apache Flink?<span></span>
</label>
<ul>
<li>
<label for="section-ffd5922da551e96e0481423fab94c463" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-architecture/" class="">Architecture</a>
</label>
</li>
<li>
<label for="section-fc28f08b67476edb77e00e03b6c7c2e0" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-applications/" class="">Applications</a>
</label>
</li>
<li>
<label for="section-612df33a02d5d4ee78d718abaab5b5b4" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-operations/" class="">Operations</a>
</label>
</li>
</ul>
<label for="section-f1ecec07350bd6810050d40158878749" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">What is Stateful Functions? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-4113a4c3072cb35f6fd7a0d4e098ee70" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">What is Flink ML? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-b39c70259d0abbe2bf1d8d645425f84d" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">What is the Flink Kubernetes Operator? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-53e0b1afcb9ccaf779dc285aa272a014" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">What is Flink Table Store? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-f4973f06a66f063045b4ebdacaf3127d" class="flex justify-between flink-menu-item">
<a href="/use-cases/" class="">Use Cases</a>
</label>
<label for="section-0f1863835376e859ac438ae9529daff2" class="flex justify-between flink-menu-item">
<a href="/powered-by/" class="">Powered By</a>
</label>
<br/>
<label for="section-f383f23a96a43d8d0cc66aeb0237e26a" class="flex justify-between flink-menu-item">
<a href="/downloads/" class="">Downloads</a>
</label>
<input type="checkbox" id="section-c727fab97b4d77e5b28ce8c448fb9000" class="toggle" />
<label for="section-c727fab97b4d77e5b28ce8c448fb9000" class="flex justify-between flink-menu-item">Getting Started<span></span>
</label>
<ul>
<li>
<label for="section-f45abaa99ab076108b9a5b94edbc6647" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/" style="color:black" class="">With Flink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-efe2166e9dce6f72e126dcc2396b4402" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html" style="color:black" class="">With Flink Stateful Functions <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-7e268d0a469b1093bb33d71d093eb7b9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/" style="color:black" class="">With Flink ML <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-cc7147cd0441503127bfaf6f219d4fbb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/" style="color:black" class="">With Flink Kubernetes Operator <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-660ca694e416d8ca9176dda52a60d637" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/docs/try-table-store/quick-start/" style="color:black" class="">With Flink Table Store <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-75db0b47bf4ae9c247aadbba5fbd720d" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/" style="color:black" class="">Training Course <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<input type="checkbox" id="section-6318075fef29529089951a49d413d083" class="toggle" />
<label for="section-6318075fef29529089951a49d413d083" class="flex justify-between flink-menu-item">Documentation<span></span>
</label>
<ul>
<li>
<label for="section-9a8122d8912450484d1c25394ad40229" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/" style="color:black" class="">Flink 1.17 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-8b2fd3efb702be3783ba98d650707e3c" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-master/" style="color:black" class="">Flink Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-5317a079cddb964c59763c27607f43d9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">Stateful Functions 3.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-25b72f108b7156e94d91b04853d8813a" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-master" style="color:black" class="">Stateful Functions Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-13a02f969904a2455a39ed90e287593f" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">ML 2.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-6d895ec5ad127a29a6a9ce101328ccdf" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-master" style="color:black" class="">ML Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-c83ad0caf34e364bf3729badd233a350" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">Kubernetes Operator 1.4 (latest) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-a2c75d90005425982ba8f26ae0e160a3" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main" style="color:black" class="">Kubernetes Operator Main (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-07b85e4b2f61b1526bf202c64460abcd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">Table Store 0.3 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-9b9a0032b1e858a34c125d828d1a0718" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-master/" style="color:black" class="">Table Store Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<label for="section-63d6a565d79aa2895f70806a46021c07" class="flex justify-between flink-menu-item">
<a href="/getting-help/" class="">Getting Help</a>
</label>
<label for="section-1d5066022b83f4732dc80f4e9eaa069a" class="flex justify-between flink-menu-item">
<a href="https://flink-packages.org/" style="color:black" class="">flink-packages.org <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<br/>
<label for="section-7821b78a97db9e919426e86121a7be9c" class="flex justify-between flink-menu-item">
<a href="/community/" class="">Community & Project Info</a>
</label>
<label for="section-8c042831df4e371c4ef9375f1df06f35" class="flex justify-between flink-menu-item">
<a href="/roadmap/" class="">Roadmap</a>
</label>
<input type="checkbox" id="section-73117efde5302fddcb193307d582b588" class="toggle" />
<label for="section-73117efde5302fddcb193307d582b588" class="flex justify-between flink-menu-item">How to Contribute<span></span>
</label>
<ul>
<li>
<label for="section-6646b26b23a3e79b8de9c552ee76f6dd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/overview/" class="">Overview</a>
</label>
</li>
<li>
<label for="section-e6ab9538b82cd5f94103b971adb7c1a9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-code/" class="">Contribute Code</a>
</label>
</li>
<li>
<label for="section-1c09e1358485e82d9b3f5f689d4ced65" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/reviewing-prs/" class="">Review Pull Requests</a>
</label>
</li>
<li>
<label for="section-ed01e0defd235498fa3c9a2a0b3302fb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/code-style-and-quality-preamble/" class="">Code Style and Quality Guide</a>
</label>
</li>
<li>
<label for="section-4e8d5e9924cf15f397711b0d82e15650" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-documentation/" class="">Contribute Documentation</a>
</label>
</li>
<li>
<label for="section-ddaa8307917e5ba7f60ba3316711e492" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/documentation-style-guide/" class="">Documentation Style Guide</a>
</label>
</li>
<li>
<label for="section-390a72c171cc82f180a308b95fc3aa72" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/improve-website/" class="">Contribute to the Website</a>
</label>
</li>
</ul>
<label for="section-9d3ddfd487223d5a199ba301f25c88c6" class="flex justify-between flink-menu-item">
<a href="/security/" class="">Security</a>
</label>
<br/>
<label for="section-a07783f405300745807d39eacf150420" class="flex justify-between flink-menu-item">
<a href="/posts/" class="">Flink Blog</a>
</label>
<br/>
<hr class="menu-break">
<label for="section-f71a7070dbb7b669824a6441408ded70" class="flex justify-between flink-menu-item">
<a href="https://github.com/apache/flink" style="color:black" class="">Flink on GitHub <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-2ccaaab8c67f3105bbf7df75faca8027" class="flex justify-between flink-menu-item">
<a href="https://twitter.com/apacheflink" style="color:black" class="">@ApacheFlink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<hr class="menu-break">
<table>
<tr>
<th colspan="2">
<label for="section-78c2028200542d78f8c1a8f6b4cbb36b" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/" style="color:black" class="">Apache Software Foundation <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></th>
</tr>
<tr>
<td>
<label for="section-794df3791a8c800841516007427a2aa3" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/licenses/" style="color:black" class="">License <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-2fae32629d4ef4fc6341f1751b405e45" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/security/" style="color:black" class="">Security <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
<tr>
<td>
<label for="section-0584e445d656b83b431227bb80ff0c30" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/sponsorship.html" style="color:black" class="">Donate <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-00d06796e489999226fb5bb27fe1b3b2" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/thanks.html" style="color:black" class="">Thanks <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
</table>
<hr class="menu-break">
<a href="/zh/" class="flex align-center">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;&nbsp;
中文版
</a>
<script src="/js/track-search-terms.js"></script>
</nav>
<script>(function(){var e=document.querySelector("aside.book-menu nav");addEventListener("beforeunload",function(){localStorage.setItem("menu.scrollTop",e.scrollTop)}),e.scrollTop=localStorage.getItem("menu.scrollTop")})()</script>
</aside>
<div class="book-page">
<header class="book-header">
<div class="flex align-center justify-between">
<label for="menu-control">
<img src="/svg/menu.svg" class="book-icon" alt="Menu" />
</label>
<strong>Implementing a custom source connector for Table API and SQL - Part Two </strong>
<label for="toc-control">
<img src="/svg/toc.svg" class="book-icon" alt="Table of Contents" />
</label>
</div>
<aside class="hidden clearfix">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3>
<ul>
<li><a href="#collect-incoming-emails">Collect incoming emails</a></li>
</ul>
</nav>
</aside>
</header>
<article class="markdown">
<h1>
<a href="/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/">Implementing a custom source connector for Table API and SQL - Part Two </a>
</h1>
September 7, 2021 -
Ingo Buerk
Daisy Tsang
<p><p>In <a href="/2021/09/07/connector-table-sql-api-part1">part one</a> of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.</p>
<h1 id="goals">
Goals
<a class="anchor" href="#goals">#</a>
</h1>
<p>Part two of the tutorial will teach you how to:</p>
<ul>
<li>integrate a source connector which connects to a mailbox using the IMAP protocol</li>
<li>use <a href="https://eclipse-ee4j.github.io/mail/">Jakarta Mail</a>, a Java library that can send and receive email via the IMAP protocol</li>
<li>write <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/">Flink SQL</a> and execute the queries in the <a href="https://www.ververica.com/apache-flink-sql-on-ververica-platform">Ververica Platform</a> for a nicer visualization</li>
</ul>
<p>You are encouraged to follow along with the code in this <a href="https://github.com/Airblader/blog-imap">repository</a>. It provides a boilerplate project that also comes with a bundled <a href="https://docs.docker.com/compose/">docker-compose</a> setup that lets you easily run the connector. You can then try it out with Flink’s SQL client.</p>
<h1 id="prerequisites">
Prerequisites
<a class="anchor" href="#prerequisites">#</a>
</h1>
<p>This tutorial assumes that you have:</p>
<ul>
<li>followed the steps outlined in <a href="/2021/09/07/connector-table-sql-api-part1">part one</a> of this tutorial</li>
<li>some familiarity with Java and objected-oriented programming</li>
</ul>
<h1 id="understand-how-to-fetch-emails-via-the-imap-protocol">
Understand how to fetch emails via the IMAP protocol
<a class="anchor" href="#understand-how-to-fetch-emails-via-the-imap-protocol">#</a>
</h1>
<p>Now that you have a working source connector that can run on Flink, it is time to connect to an email server via <a href="https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol">IMAP</a> (an Internet protocol that allows email clients to retrieve messages from a mail server) so that Flink can process emails instead of test static data.</p>
<p>You will use <a href="https://eclipse-ee4j.github.io/mail/">Jakarta Mail</a>, a Java library that can be used to send and receive email via IMAP. For simplicity, authentication will use a plain username and password.</p>
<p>This tutorial will focus more on how to implement a connector for Flink. If you want to learn more about the details of how IMAP or Jakarta Mail work, you are encouraged to explore a more extensive implementation at this <a href="https://github.com/TNG/flink-connector-email">repository</a>. It offers a wide range of information to be read from emails, as well as options to ingest existing emails alongside new ones, connecting with SSL, and more. It also supports different formats for reading email content and implements some connector abilities such as <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.html">reading metadata</a>.</p>
<p>In order to fetch emails, you will need to connect to the email server, register a listener for new emails and collect them whenever they arrive, and enter a loop to keep the connector running.</p>
<h1 id="add-configuration-options---server-information-and-credentials">
Add configuration options - server information and credentials
<a class="anchor" href="#add-configuration-options---server-information-and-credentials">#</a>
</h1>
<p>In order to connect to your IMAP server, you will need at least the following:</p>
<ul>
<li>hostname (of the mail server)</li>
<li>port number</li>
<li>username</li>
<li>password</li>
</ul>
<p>You will start by creating a class to encapsulate the configuration options. You will make use of <a href="https://projectlombok.org">Lombok</a> to help with some boilerplate code. By adding the <code>@Data</code> and <code>@SuperBuilder</code> annotations, Lombok will generate these for all the fields of the immutable class.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">lombok.Data</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">lombok.experimental.SuperBuilder</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">javax.annotation.Nullable</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.io.Serializable</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="nd">@Data</span>
</span></span><span class="line"><span class="cl"><span class="nd">@SuperBuilder</span><span class="o">(</span><span class="n">toBuilder</span> <span class="o">=</span> <span class="kc">true</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSourceOptions</span> <span class="kd">implements</span> <span class="n">Serializable</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">static</span> <span class="kd">final</span> <span class="kt">long</span> <span class="n">serialVersionUID</span> <span class="o">=</span> <span class="mi">1L</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="n">String</span> <span class="n">host</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="nd">@Nullable</span> <span class="n">Integer</span> <span class="n">port</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="nd">@Nullable</span> <span class="n">String</span> <span class="n">user</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="nd">@Nullable</span> <span class="n">String</span> <span class="n">password</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>Now you can add an instance of this class to the <code>ImapSource</code> and <code>ImapTableSource</code> classes previously created (in part one) so it can be used there. Take note of the column names with which the table has been created. This will help later. You will also switch the source to be unbounded now as we will change the implementation in a bit to continuously listen for new emails.</p>
<div class="note">
<h5>Hint</h5>
<p>The column names would be "subject" and "content" with the SQL executed in part one:</p>
<pre><code class="language-sql">CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap');</code></pre>
</div>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.stream.Collectors</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="n">ImapSourceOptions</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">columnNames</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="nf">ImapSource</span><span class="o">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">ImapSourceOptions</span> <span class="n">options</span><span class="o">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">columnNames</span>
</span></span><span class="line"><span class="cl"> <span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">this</span><span class="o">.</span><span class="na">options</span> <span class="o">=</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="k">this</span><span class="o">.</span><span class="na">columnNames</span> <span class="o">=</span> <span class="n">columnNames</span><span class="o">.</span><span class="na">stream</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">String</span><span class="o">::</span><span class="n">toUpperCase</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">Collectors</span><span class="o">.</span><span class="na">toList</span><span class="o">());</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// ...
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="o">}</span>
</span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.connector.source.DynamicTableSource</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.connector.source.ScanTableSource</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapTableSource</span> <span class="kd">implements</span> <span class="n">ScanTableSource</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="n">ImapSourceOptions</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">columnNames</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="nf">ImapTableSource</span><span class="o">(</span>
</span></span><span class="line"><span class="cl"> <span class="n">ImapSourceOptions</span> <span class="n">options</span><span class="o">,</span>
</span></span><span class="line"><span class="cl"> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">columnNames</span>
</span></span><span class="line"><span class="cl"> <span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">this</span><span class="o">.</span><span class="na">options</span> <span class="o">=</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="k">this</span><span class="o">.</span><span class="na">columnNames</span> <span class="o">=</span> <span class="n">columnNames</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="n">ScanRuntimeProvider</span> <span class="nf">getScanRuntimeProvider</span><span class="o">(</span><span class="n">ScanContext</span> <span class="n">ctx</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="kt">boolean</span> <span class="n">bounded</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">ImapSource</span> <span class="n">source</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ImapSource</span><span class="o">(</span><span class="n">options</span><span class="o">,</span> <span class="n">columnNames</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">SourceFunctionProvider</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">source</span><span class="o">,</span> <span class="n">bounded</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="n">DynamicTableSource</span> <span class="nf">copy</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="k">new</span> <span class="n">ImapTableSource</span><span class="o">(</span><span class="n">options</span><span class="o">,</span> <span class="n">columnNames</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="o">}</span>
</span></span></code></pre></div><p>Finally, in the <code>ImapTableSourceFactory</code> class, you need to create a <code>ConfigOption&lt;&gt;</code> for the hostname, port number, username, and password. Then you need to report them to Flink. Host, user, and password are mandatory and can be added to <code>requiredOptions()</code>; the port is optional and can be added to <code>optionalOptions()</code> instead.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.configuration.ConfigOption</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.configuration.ConfigOptions</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.factories.DynamicTableSourceFactory</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.HashSet</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.Set</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapTableSourceFactory</span> <span class="kd">implements</span> <span class="n">DynamicTableSourceFactory</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">HOST</span> <span class="o">=</span> <span class="n">ConfigOptions</span><span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="s">&#34;host&#34;</span><span class="o">).</span><span class="na">stringType</span><span class="o">().</span><span class="na">noDefaultValue</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">PORT</span> <span class="o">=</span> <span class="n">ConfigOptions</span><span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="s">&#34;port&#34;</span><span class="o">).</span><span class="na">intType</span><span class="o">().</span><span class="na">noDefaultValue</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">USER</span> <span class="o">=</span> <span class="n">ConfigOptions</span><span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="s">&#34;user&#34;</span><span class="o">).</span><span class="na">stringType</span><span class="o">().</span><span class="na">noDefaultValue</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kd">static</span> <span class="kd">final</span> <span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">PASSWORD</span> <span class="o">=</span> <span class="n">ConfigOptions</span><span class="o">.</span><span class="na">key</span><span class="o">(</span><span class="s">&#34;password&#34;</span><span class="o">).</span><span class="na">stringType</span><span class="o">().</span><span class="na">noDefaultValue</span><span class="o">();</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span> <span class="n">requiredOptions</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;&gt;();</span>
</span></span><span class="line"><span class="cl"> <span class="n">options</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">HOST</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">options</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">USER</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">options</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">PASSWORD</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span> <span class="n">optionalOptions</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span> <span class="n">options</span> <span class="o">=</span> <span class="k">new</span> <span class="n">HashSet</span><span class="o">&lt;&gt;();</span>
</span></span><span class="line"><span class="cl"> <span class="n">options</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">PORT</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">options</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span><span class="o">}</span>
</span></span></code></pre></div><p>Now take a look at the <code>createDynamicTableSource()</code> function in the <code>ImapTableSourceFactory</code> class. Recall that previously (in part one) you used a small helper utility <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html">TableFactoryHelper</a>, that Flink offers which ensures that required options are set and that no unknown options are provided. You can now use it to automatically make sure that the required options of hostname, port number, username, and password are all provided when creating a table using this connector. The helper function will throw an error message if one required option is missing. You can also use it to access the provided options (<code>getOptions()</code>), convert them into an instance of the <code>ImapTableSource</code> class created earlier, and provide the instance to the table source:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.List</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.stream.Collectors</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.factories.DynamicTableSourceFactory</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.factories.FactoryUtil</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.catalog.Column</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapTableSourceFactory</span> <span class="kd">implements</span> <span class="n">DynamicTableSourceFactory</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// ...
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="n">DynamicTableSource</span> <span class="nf">createDynamicTableSource</span><span class="o">(</span><span class="n">Context</span> <span class="n">ctx</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">FactoryUtil</span><span class="o">.</span><span class="na">TableFactoryHelper</span> <span class="n">factoryHelper</span> <span class="o">=</span> <span class="n">FactoryUtil</span><span class="o">.</span><span class="na">createTableFactoryHelper</span><span class="o">(</span><span class="k">this</span><span class="o">,</span> <span class="n">ctx</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">factoryHelper</span><span class="o">.</span><span class="na">validate</span><span class="o">();</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">ImapSourceOptions</span> <span class="n">options</span> <span class="o">=</span> <span class="n">ImapSourceOptions</span><span class="o">.</span><span class="na">builder</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">host</span><span class="o">(</span><span class="n">factoryHelper</span><span class="o">.</span><span class="na">getOptions</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">HOST</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">port</span><span class="o">(</span><span class="n">factoryHelper</span><span class="o">.</span><span class="na">getOptions</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">PORT</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">user</span><span class="o">(</span><span class="n">factoryHelper</span><span class="o">.</span><span class="na">getOptions</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">USER</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">password</span><span class="o">(</span><span class="n">factoryHelper</span><span class="o">.</span><span class="na">getOptions</span><span class="o">().</span><span class="na">get</span><span class="o">(</span><span class="n">PASSWORD</span><span class="o">))</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">build</span><span class="o">();</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">columnNames</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">getCatalogTable</span><span class="o">().</span><span class="na">getResolvedSchema</span><span class="o">().</span><span class="na">getColumns</span><span class="o">().</span><span class="na">stream</span><span class="o">()</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">filter</span><span class="o">(</span><span class="n">Column</span><span class="o">::</span><span class="n">isPhysical</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">Column</span><span class="o">::</span><span class="n">getName</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">Collectors</span><span class="o">.</span><span class="na">toList</span><span class="o">());</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="k">new</span> <span class="n">ImapTableSource</span><span class="o">(</span><span class="n">options</span><span class="o">,</span> <span class="n">columnNames</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><div class="note">
<h5>Hint</h5>
<p>
Ideally, you would use connector <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/#metadata">metadata</a> instead of column names. You can refer again to the accompanying <a href="https://github.com/TNG/flink-connector-email">repository</a> which does implement this using metadata fields.
</p>
</div>
<p>To test these new configuration options, run:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">cd</span> testing/
</span></span><span class="line"><span class="cl">$ ./build_and_run.sh
</span></span></code></pre></div><p>Once you see the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/">Flink SQL client</a> start up, execute the following statements to create a table with your connector:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">content</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>This time it will fail because the required options are not provided:</p>
<pre tabindex="0"><code>[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: One or more required options are missing.
Missing required options are:
host
password
user
</code></pre><h1 id="connect-to-the-source-email-server">
Connect to the source email server
<a class="anchor" href="#connect-to-the-source-email-server">#</a>
</h1>
<p>Now that you have configured the required options to connect to the email server, it is time to actually connect to the server.</p>
<p>Going back to the <code>ImapSource</code> class, you first need to convert the options given to the table source into a <a href="https://docs.oracle.com/javase/tutorial/essential/environment/properties.html">Properties</a> object, which is what you can pass to the Jakarta library. You can also set various other properties here as well (i.e. enabling SSL).</p>
<p>The specific properties that the Jakarta library understands are documented <a href="https://jakarta.ee/specifications/mail/1.6/apidocs/index.html?com/sun/mail/imap/package-summary.html">here</a>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">java.util.Properties</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="n">Properties</span> <span class="nf">getSessionProperties</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&#34;mail.store.protocol&#34;</span><span class="o">,</span> <span class="s">&#34;imap&#34;</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&#34;mail.imap.auth&#34;</span><span class="o">,</span> <span class="kc">true</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&#34;mail.imap.host&#34;</span><span class="o">,</span> <span class="n">options</span><span class="o">.</span><span class="na">getHost</span><span class="o">());</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getPort</span><span class="o">()</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">&#34;mail.imap.port&#34;</span><span class="o">,</span> <span class="n">options</span><span class="o">.</span><span class="na">getPort</span><span class="o">());</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">return</span> <span class="n">props</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>Now create a method (<code>connect()</code>) which sets up the connection:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">jakarta.mail.*</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">com.sun.mail.imap.IMAPFolder</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">transient</span> <span class="n">Store</span> <span class="n">store</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">transient</span> <span class="n">IMAPFolder</span> <span class="n">folder</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kt">void</span> <span class="nf">connect</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">Session</span> <span class="n">session</span> <span class="o">=</span> <span class="n">Session</span><span class="o">.</span><span class="na">getInstance</span><span class="o">(</span><span class="n">getSessionProperties</span><span class="o">(),</span> <span class="kc">null</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">store</span> <span class="o">=</span> <span class="n">session</span><span class="o">.</span><span class="na">getStore</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="n">store</span><span class="o">.</span><span class="na">connect</span><span class="o">(</span><span class="n">options</span><span class="o">.</span><span class="na">getUser</span><span class="o">(),</span> <span class="n">options</span><span class="o">.</span><span class="na">getPassword</span><span class="o">());</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">Folder</span> <span class="n">genericFolder</span> <span class="o">=</span> <span class="n">store</span><span class="o">.</span><span class="na">getFolder</span><span class="o">(</span><span class="s">&#34;INBOX&#34;</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="n">folder</span> <span class="o">=</span> <span class="o">(</span><span class="n">IMAPFolder</span><span class="o">)</span> <span class="n">genericFolder</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(!</span><span class="n">folder</span><span class="o">.</span><span class="na">isOpen</span><span class="o">())</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">folder</span><span class="o">.</span><span class="na">open</span><span class="o">(</span><span class="n">Folder</span><span class="o">.</span><span class="na">READ_ONLY</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>You can now use this method to connect to the mail server when the source is created. Create a loop to keep the source running while collecting email counts. Lastly, implement methods to cancel and close the connection:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">jakarta.mail.*</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.SourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kd">transient</span> <span class="kd">volatile</span> <span class="kt">boolean</span> <span class="n">running</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">(</span><span class="n">SourceFunction</span><span class="o">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">connect</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="n">running</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// TODO: Listen for new messages
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="k">while</span> <span class="o">(</span><span class="n">running</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// Trigger some IMAP request to force the server to send a notification
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="n">folder</span><span class="o">.</span><span class="na">getMessageCount</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="n">Thread</span><span class="o">.</span><span class="na">sleep</span><span class="o">(</span><span class="mi">250</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">cancel</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">running</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">close</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">folder</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">folder</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">if</span> <span class="o">(</span><span class="n">store</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">store</span><span class="o">.</span><span class="na">close</span><span class="o">();</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>There is a request trigger to the server in every loop iteration. This is crucial as it ensures that the server will keep sending notifications. A more sophisticated approach would be to make use of the IDLE protocol.</p>
<div class="note">
<h5>Note</h5>
<p>Since the source is not checkpointable, no state fault tolerance will be possible.</p>
</div>
<h2 id="collect-incoming-emails">
Collect incoming emails
<a class="anchor" href="#collect-incoming-emails">#</a>
</h2>
<p>Now you need to listen for new emails arriving in the inbox folder and collect them. To begin, hardcode the schema and only return the email’s subject. Fortunately, Jakarta provides a simple hook (<code>addMessageCountListener()</code>) to get notified when new messages arrive on the server. You can use this in place of the “TODO” comment above:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">jakarta.mail.*</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">jakarta.mail.event.MessageCountAdapter</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">jakarta.mail.event.MessageCountEvent</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.GenericRowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.StringData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">run</span><span class="o">(</span><span class="n">SourceFunction</span><span class="o">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span>
</span></span><span class="line"><span class="cl"> <span class="n">folder</span><span class="o">.</span><span class="na">addMessageCountListener</span><span class="o">(</span><span class="k">new</span> <span class="n">MessageCountAdapter</span><span class="o">()</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="nd">@Override</span>
</span></span><span class="line"><span class="cl"> <span class="kd">public</span> <span class="kt">void</span> <span class="nf">messagesAdded</span><span class="o">(</span><span class="n">MessageCountEvent</span> <span class="n">e</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">collectMessages</span><span class="o">(</span><span class="n">ctx</span><span class="o">,</span> <span class="n">e</span><span class="o">.</span><span class="na">getMessages</span><span class="o">());</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">});</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="c1">// …
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kt">void</span> <span class="nf">collectMessages</span><span class="o">(</span><span class="n">SourceFunction</span><span class="o">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Message</span><span class="o">[]</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="o">(</span><span class="n">Message</span> <span class="n">message</span> <span class="o">:</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">ctx</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">GenericRowData</span><span class="o">.</span><span class="na">of</span><span class="o">(</span><span class="n">StringData</span><span class="o">.</span><span class="na">fromString</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getSubject</span><span class="o">())));</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">MessagingException</span> <span class="n">ignored</span><span class="o">)</span> <span class="o">{}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>Now build the project again and start up the SQL client:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">cd</span> testing/
</span></span><span class="line"><span class="cl">$ ./build_and_run.sh
</span></span></code></pre></div><p>This time, you will connect to a <a href="https://greenmail-mail-test.github.io/greenmail/">GreenMail server</a> which is started as part of the <a href="https://github.com/Airblader/blog-imap/blob/master/testing/docker-compose.yaml">setup</a>:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;host&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;greenmail&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;port&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;3143&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;user&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>The query above should now run continuously but no rows will be produced since it is a test server. You need to first send an email to the server. If you have <a href="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/mailx.html">mailx</a> installed, you can do so by executing in your terminal:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">echo</span> <span class="s2">&#34;This is the email body&#34;</span> <span class="p">|</span> mailx -Sv15-compat <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> -s<span class="s2">&#34;Email Subject&#34;</span> <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> -Smta<span class="o">=</span><span class="s2">&#34;smtp://alice:alice@localhost:3025&#34;</span> <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> alice@acme.org
</span></span></code></pre></div><p>The row “Email Subject” should now have appeared as a row in your output. Your source connector is working!</p>
<p>However, since you are still hard-coding the schema produced by the source, defining the table with a different schema will produce errors. You want to be able to define which fields of an email interest you and then produce the data accordingly. To do this, you will use the list of column names from earlier and then look at it when you collect the emails.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.GenericRowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.RowData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"><span class="kn">import</span> <span class="nn">org.apache.flink.table.data.TimestampData</span><span class="o">;</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">ImapSource</span> <span class="kd">extends</span> <span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kt">void</span> <span class="nf">collectMessages</span><span class="o">(</span><span class="n">SourceFunction</span><span class="o">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Message</span><span class="o">[]</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="o">(</span><span class="n">Message</span> <span class="n">message</span> <span class="o">:</span> <span class="n">messages</span><span class="o">)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">try</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="n">collectMessage</span><span class="o">(</span><span class="n">ctx</span><span class="o">,</span> <span class="n">message</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span> <span class="k">catch</span> <span class="o">(</span><span class="n">MessagingException</span> <span class="n">ignored</span><span class="o">)</span> <span class="o">{}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="kd">private</span> <span class="kt">void</span> <span class="nf">collectMessage</span><span class="o">(</span><span class="n">SourceFunction</span><span class="o">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Message</span> <span class="n">message</span><span class="o">)</span>
</span></span><span class="line"><span class="cl"> <span class="kd">throws</span> <span class="n">MessagingException</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="kd">final</span> <span class="n">GenericRowData</span> <span class="n">row</span> <span class="o">=</span> <span class="k">new</span> <span class="n">GenericRowData</span><span class="o">(</span><span class="n">columnNames</span><span class="o">.</span><span class="na">size</span><span class="o">());</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="k">for</span> <span class="o">(</span><span class="kt">int</span> <span class="n">i</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> <span class="n">i</span> <span class="o">&lt;</span> <span class="n">columnNames</span><span class="o">.</span><span class="na">size</span><span class="o">();</span> <span class="n">i</span><span class="o">++)</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">switch</span> <span class="o">(</span><span class="n">columnNames</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="n">i</span><span class="o">))</span> <span class="o">{</span>
</span></span><span class="line"><span class="cl"> <span class="k">case</span> <span class="s">&#34;SUBJECT&#34;</span><span class="o">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">row</span><span class="o">.</span><span class="na">setField</span><span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">StringData</span><span class="o">.</span><span class="na">fromString</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getSubject</span><span class="o">()));</span>
</span></span><span class="line"><span class="cl"> <span class="k">break</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="k">case</span> <span class="s">&#34;SENT&#34;</span><span class="o">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">row</span><span class="o">.</span><span class="na">setField</span><span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">TimestampData</span><span class="o">.</span><span class="na">fromInstant</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getSentDate</span><span class="o">().</span><span class="na">toInstant</span><span class="o">()));</span>
</span></span><span class="line"><span class="cl"> <span class="k">break</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="k">case</span> <span class="s">&#34;RECEIVED&#34;</span><span class="o">:</span>
</span></span><span class="line"><span class="cl"> <span class="n">row</span><span class="o">.</span><span class="na">setField</span><span class="o">(</span><span class="n">i</span><span class="o">,</span> <span class="n">TimestampData</span><span class="o">.</span><span class="na">fromInstant</span><span class="o">(</span><span class="n">message</span><span class="o">.</span><span class="na">getReceivedDate</span><span class="o">().</span><span class="na">toInstant</span><span class="o">()));</span>
</span></span><span class="line"><span class="cl"> <span class="k">break</span><span class="o">;</span>
</span></span><span class="line"><span class="cl"> <span class="c1">// ...
</span></span></span><span class="line"><span class="cl"><span class="c1"></span> <span class="o">}</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl">
</span></span><span class="line"><span class="cl"> <span class="n">ctx</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">row</span><span class="o">);</span>
</span></span><span class="line"><span class="cl"> <span class="o">}</span>
</span></span><span class="line"><span class="cl"><span class="o">}</span>
</span></span></code></pre></div><p>You should now have a working source where you can select any of the columns that are supported. Try it out again in the SQL client, but this time specifying all the columns (&ldquo;subject&rdquo;, &ldquo;sent&rdquo;, &ldquo;received&rdquo;) supported above:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">sent</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">received</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;host&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;greenmail&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;port&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;3143&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;user&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>Use the <code>mailx</code> command from earlier to send emails to the GreenMail server and you should see them appear. You can also try selecting only some of the columns, or write more complex queries.</p>
<h1 id="test-the-connector-with-a-real-mail-server-on-the-ververica-platform">
Test the connector with a real mail server on the Ververica Platform
<a class="anchor" href="#test-the-connector-with-a-real-mail-server-on-the-ververica-platform">#</a>
</h1>
<p>If you want to test the connector with a real mail server, you can import it into <a href="https://www.ververica.com/getting-started">Ververica Platform Community Edition</a>. To begin, make sure that you have the Ververica Platform up and running.</p>
<p>Since the example connector in this blog post is still a bit limited, you will use the finished connector in this <a href="github.com/TNG/flink-connector-email">repository</a> instead. You can clone that repository and build it the same way to obtain the JAR file.</p>
<p>For this example, let&rsquo;s connect to a Gmail account. This requires SSL and comes with an additional caveat that you need to enable two-factor authentication and create an application password to use instead of your real password.</p>
<p>First, head to SQL → Connectors. There you can create a new connector by uploading your JAR file. The platform will detect the connector options automatically. Afterwards, go back to the SQL Editor and you should now be able to use the connector.</p>
<div class="row front-graphic">
<img src="/img/blog/2021-09-07-connector-table-sql-api/VVP-SQL-Editor.png" alt="Ververica Platform - SQL Editor"/>
<p class="align-center">Ververica Platform - SQL Editor</p>
</div>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>Apache Flink is designed for easy extensibility and allows users to access many different external systems as data sources or sinks through a versatile set of connectors. It can read and write data from databases, local and distributed file systems.</p>
<p>Flink also exposes APIs on top of which custom connectors can be built. In this two-part blog series, you explored some of these APIs and concepts and learned how to implement your own custom source connector that can read in data from an email inbox. You then used Flink to process incoming emails through the IMAP protocol and wrote some Flink SQL.</p>
</p>
</article>
<footer class="book-footer">
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
<br><br>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2021-09-07-connector-table-sql-api-part2.md" style="color:black"><i class="fa fa-edit fa-fw"></i>Edit This Page</a>
</footer>
<div class="book-comments">
</div>
<label for="menu-control" class="hidden book-menu-overlay"></label>
</div>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3>
<ul>
<li><a href="#collect-incoming-emails">Collect incoming emails</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc">
<button class="toc" onclick="expandToc()">
<i class="fa fa-expand" aria-hidden="true"></i>
</button>
</aside>
</main>
</body>
</html>