blob: 7b09604f2c3ddb43d3522e3a92642e29b978d30f [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="In part one of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.
Goals # Part two of the tutorial will teach you how to:
integrate a source connector which connects to a mailbox using the IMAP protocol use Jakarta Mail, a Java library that can send and receive email via the IMAP protocol write Flink SQL and execute the queries in the Ververica Platform for a nicer visualization You are encouraged to follow along with the code in this repository.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Implementing a custom source connector for Table API and SQL - Part Two " />
<meta property="og:description" content="In part one of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.
Goals # Part two of the tutorial will teach you how to:
integrate a source connector which connects to a mailbox using the IMAP protocol use Jakarta Mail, a Java library that can send and receive email via the IMAP protocol write Flink SQL and execute the queries in the Ververica Platform for a nicer visualization You are encouraged to follow along with the code in this repository." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2021-09-07T00:00:00+00:00" />
<meta property="article:modified_time" content="2021-09-07T00:00:00+00:00" />
<title>Implementing a custom source connector for Table API and SQL - Part Two | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2021/09/07/implementing-a-custom-source-connector-for-table-api-and-sql-part-two/">Implementing a custom source connector for Table API and SQL - Part Two </a>
</h1>
September 7, 2021 -
Ingo Buerk
Daisy Tsang
<p><p>In <a href="/2021/09/07/connector-table-sql-api-part1">part one</a> of this tutorial, you learned how to build a custom source connector for Flink. In part two, you will learn how to integrate the connector with a test email inbox through the IMAP protocol and filter out emails using Flink SQL.</p>
<h1 id="goals">
Goals
<a class="anchor" href="#goals">#</a>
</h1>
<p>Part two of the tutorial will teach you how to:</p>
<ul>
<li>integrate a source connector which connects to a mailbox using the IMAP protocol</li>
<li>use <a href="https://eclipse-ee4j.github.io/mail/">Jakarta Mail</a>, a Java library that can send and receive email via the IMAP protocol</li>
<li>write <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sql/overview/">Flink SQL</a> and execute the queries in the <a href="https://www.ververica.com/apache-flink-sql-on-ververica-platform">Ververica Platform</a> for a nicer visualization</li>
</ul>
<p>You are encouraged to follow along with the code in this <a href="https://github.com/Airblader/blog-imap">repository</a>. It provides a boilerplate project that also comes with a bundled <a href="https://docs.docker.com/compose/">docker-compose</a> setup that lets you easily run the connector. You can then try it out with Flink’s SQL client.</p>
<h1 id="prerequisites">
Prerequisites
<a class="anchor" href="#prerequisites">#</a>
</h1>
<p>This tutorial assumes that you have:</p>
<ul>
<li>followed the steps outlined in <a href="/2021/09/07/connector-table-sql-api-part1">part one</a> of this tutorial</li>
<li>some familiarity with Java and objected-oriented programming</li>
</ul>
<h1 id="understand-how-to-fetch-emails-via-the-imap-protocol">
Understand how to fetch emails via the IMAP protocol
<a class="anchor" href="#understand-how-to-fetch-emails-via-the-imap-protocol">#</a>
</h1>
<p>Now that you have a working source connector that can run on Flink, it is time to connect to an email server via <a href="https://en.wikipedia.org/wiki/Internet_Message_Access_Protocol">IMAP</a> (an Internet protocol that allows email clients to retrieve messages from a mail server) so that Flink can process emails instead of test static data.</p>
<p>You will use <a href="https://eclipse-ee4j.github.io/mail/">Jakarta Mail</a>, a Java library that can be used to send and receive email via IMAP. For simplicity, authentication will use a plain username and password.</p>
<p>This tutorial will focus more on how to implement a connector for Flink. If you want to learn more about the details of how IMAP or Jakarta Mail work, you are encouraged to explore a more extensive implementation at this <a href="https://github.com/TNG/flink-connector-email">repository</a>. It offers a wide range of information to be read from emails, as well as options to ingest existing emails alongside new ones, connecting with SSL, and more. It also supports different formats for reading email content and implements some connector abilities such as <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/connector/source/abilities/SupportsReadingMetadata.html">reading metadata</a>.</p>
<p>In order to fetch emails, you will need to connect to the email server, register a listener for new emails and collect them whenever they arrive, and enter a loop to keep the connector running.</p>
<h1 id="add-configuration-options---server-information-and-credentials">
Add configuration options - server information and credentials
<a class="anchor" href="#add-configuration-options---server-information-and-credentials">#</a>
</h1>
<p>In order to connect to your IMAP server, you will need at least the following:</p>
<ul>
<li>hostname (of the mail server)</li>
<li>port number</li>
<li>username</li>
<li>password</li>
</ul>
<p>You will start by creating a class to encapsulate the configuration options. You will make use of <a href="https://projectlombok.org">Lombok</a> to help with some boilerplate code. By adding the <code>@Data</code> and <code>@SuperBuilder</code> annotations, Lombok will generate these for all the fields of the immutable class.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">lombok.Data</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">lombok.experimental.SuperBuilder</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">javax.annotation.Nullable</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.io.Serializable</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="nd">@Data</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="nd">@SuperBuilder</span><span class="p">(</span><span class="n">toBuilder</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">true</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSourceOptions</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">Serializable</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">serialVersionUID</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">1L</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">host</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="nd">@Nullable</span><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">port</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="nd">@Nullable</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">user</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="nd">@Nullable</span><span class="w"> </span><span class="n">String</span><span class="w"> </span><span class="n">password</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Now you can add an instance of this class to the <code>ImapSource</code> and <code>ImapTableSource</code> classes previously created (in part one) so it can be used there. Take note of the column names with which the table has been created. This will help later. You will also switch the source to be unbounded now as we will change the implementation in a bit to continuously listen for new emails.</p>
<div class="note">
<h5>Hint</h5>
<p>The column names would be "subject" and "content" with the SQL executed in part one:</p>
<pre><code class="language-sql">CREATE TABLE T (subject STRING, content STRING) WITH ('connector' = 'imap');</code></pre>
</div>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.List</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.stream.Collectors</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">columnNames</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">ImapSource</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="w"> </span><span class="n">options</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">columnNames</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">columnNames</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">columnNames</span><span class="p">.</span><span class="na">stream</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="n">String</span><span class="p">::</span><span class="n">toUpperCase</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">Collectors</span><span class="p">.</span><span class="na">toList</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.DynamicTableSource</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.connector.source.ScanTableSource</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.List</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapTableSource</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">ScanTableSource</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">columnNames</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="nf">ImapTableSource</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="w"> </span><span class="n">options</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">columnNames</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">this</span><span class="p">.</span><span class="na">columnNames</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">columnNames</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">ScanRuntimeProvider</span><span class="w"> </span><span class="nf">getScanRuntimeProvider</span><span class="p">(</span><span class="n">ScanContext</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="n">bounded</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">false</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ImapSource</span><span class="w"> </span><span class="n">source</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapSource</span><span class="p">(</span><span class="n">options</span><span class="p">,</span><span class="w"> </span><span class="n">columnNames</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">SourceFunctionProvider</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">source</span><span class="p">,</span><span class="w"> </span><span class="n">bounded</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">DynamicTableSource</span><span class="w"> </span><span class="nf">copy</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapTableSource</span><span class="p">(</span><span class="n">options</span><span class="p">,</span><span class="w"> </span><span class="n">columnNames</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Finally, in the <code>ImapTableSourceFactory</code> class, you need to create a <code>ConfigOption&lt;&gt;</code> for the hostname, port number, username, and password. Then you need to report them to Flink. Host, user, and password are mandatory and can be added to <code>requiredOptions()</code>; the port is optional and can be added to <code>optionalOptions()</code> instead.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.configuration.ConfigOption</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.configuration.ConfigOptions</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.factories.DynamicTableSourceFactory</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.HashSet</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Set</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapTableSourceFactory</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">DynamicTableSourceFactory</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">HOST</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ConfigOptions</span><span class="p">.</span><span class="na">key</span><span class="p">(</span><span class="s">&#34;host&#34;</span><span class="p">).</span><span class="na">stringType</span><span class="p">().</span><span class="na">noDefaultValue</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">PORT</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ConfigOptions</span><span class="p">.</span><span class="na">key</span><span class="p">(</span><span class="s">&#34;port&#34;</span><span class="p">).</span><span class="na">intType</span><span class="p">().</span><span class="na">noDefaultValue</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">USER</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ConfigOptions</span><span class="p">.</span><span class="na">key</span><span class="p">(</span><span class="s">&#34;user&#34;</span><span class="p">).</span><span class="na">stringType</span><span class="p">().</span><span class="na">noDefaultValue</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kd">static</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ConfigOption</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">PASSWORD</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ConfigOptions</span><span class="p">.</span><span class="na">key</span><span class="p">(</span><span class="s">&#34;password&#34;</span><span class="p">).</span><span class="na">stringType</span><span class="p">().</span><span class="na">noDefaultValue</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">requiredOptions</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">HashSet</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">HOST</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">USER</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">PASSWORD</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">optionalOptions</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">ConfigOption</span><span class="o">&lt;?&gt;&gt;</span><span class="w"> </span><span class="n">options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">HashSet</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">PORT</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">options</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Now take a look at the <code>createDynamicTableSource()</code> function in the <code>ImapTableSourceFactory</code> class. Recall that previously (in part one) you used a small helper utility <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/api/java/org/apache/flink/table/factories/FactoryUtil.TableFactoryHelper.html">TableFactoryHelper</a>, that Flink offers which ensures that required options are set and that no unknown options are provided. You can now use it to automatically make sure that the required options of hostname, port number, username, and password are all provided when creating a table using this connector. The helper function will throw an error message if one required option is missing. You can also use it to access the provided options (<code>getOptions()</code>), convert them into an instance of the <code>ImapTableSource</code> class created earlier, and provide the instance to the table source:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.List</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.stream.Collectors</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.factories.DynamicTableSourceFactory</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.factories.FactoryUtil</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.catalog.Column</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapTableSourceFactory</span><span class="w"> </span><span class="kd">implements</span><span class="w"> </span><span class="n">DynamicTableSourceFactory</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">DynamicTableSource</span><span class="w"> </span><span class="nf">createDynamicTableSource</span><span class="p">(</span><span class="n">Context</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">FactoryUtil</span><span class="p">.</span><span class="na">TableFactoryHelper</span><span class="w"> </span><span class="n">factoryHelper</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FactoryUtil</span><span class="p">.</span><span class="na">createTableFactoryHelper</span><span class="p">(</span><span class="k">this</span><span class="p">,</span><span class="w"> </span><span class="n">ctx</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">validate</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="w"> </span><span class="n">options</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ImapSourceOptions</span><span class="p">.</span><span class="na">builder</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">host</span><span class="p">(</span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">getOptions</span><span class="p">().</span><span class="na">get</span><span class="p">(</span><span class="n">HOST</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">port</span><span class="p">(</span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">getOptions</span><span class="p">().</span><span class="na">get</span><span class="p">(</span><span class="n">PORT</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">user</span><span class="p">(</span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">getOptions</span><span class="p">().</span><span class="na">get</span><span class="p">(</span><span class="n">USER</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">password</span><span class="p">(</span><span class="n">factoryHelper</span><span class="p">.</span><span class="na">getOptions</span><span class="p">().</span><span class="na">get</span><span class="p">(</span><span class="n">PASSWORD</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">build</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="w"> </span><span class="n">columnNames</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">getCatalogTable</span><span class="p">().</span><span class="na">getResolvedSchema</span><span class="p">().</span><span class="na">getColumns</span><span class="p">().</span><span class="na">stream</span><span class="p">()</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">filter</span><span class="p">(</span><span class="n">Column</span><span class="p">::</span><span class="n">isPhysical</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="n">Column</span><span class="p">::</span><span class="n">getName</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">Collectors</span><span class="p">.</span><span class="na">toList</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">ImapTableSource</span><span class="p">(</span><span class="n">options</span><span class="p">,</span><span class="w"> </span><span class="n">columnNames</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><div class="note">
<h5>Hint</h5>
<p>
Ideally, you would use connector <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/#metadata">metadata</a> instead of column names. You can refer again to the accompanying <a href="https://github.com/TNG/flink-connector-email">repository</a> which does implement this using metadata fields.
</p>
</div>
<p>To test these new configuration options, run:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">cd</span> testing/
</span></span><span class="line"><span class="cl">$ ./build_and_run.sh
</span></span></code></pre></div><p>Once you see the <a href="https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/">Flink SQL client</a> start up, execute the following statements to create a table with your connector:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w"> </span><span class="n">content</span><span class="w"> </span><span class="n">STRING</span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>This time it will fail because the required options are not provided:</p>
<pre tabindex="0"><code>[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: One or more required options are missing.
Missing required options are:
host
password
user
</code></pre><h1 id="connect-to-the-source-email-server">
Connect to the source email server
<a class="anchor" href="#connect-to-the-source-email-server">#</a>
</h1>
<p>Now that you have configured the required options to connect to the email server, it is time to actually connect to the server.</p>
<p>Going back to the <code>ImapSource</code> class, you first need to convert the options given to the table source into a <a href="https://docs.oracle.com/javase/tutorial/essential/environment/properties.html">Properties</a> object, which is what you can pass to the Jakarta library. You can also set various other properties here as well (i.e. enabling SSL).</p>
<p>The specific properties that the Jakarta library understands are documented <a href="https://jakarta.ee/specifications/mail/1.6/apidocs/index.html?com/sun/mail/imap/package-summary.html">here</a>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">java.util.Properties</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="nf">getSessionProperties</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Properties</span><span class="w"> </span><span class="n">props</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Properties</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="s">&#34;mail.store.protocol&#34;</span><span class="p">,</span><span class="w"> </span><span class="s">&#34;imap&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="s">&#34;mail.imap.auth&#34;</span><span class="p">,</span><span class="w"> </span><span class="kc">true</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="s">&#34;mail.imap.host&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">getHost</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">options</span><span class="p">.</span><span class="na">getPort</span><span class="p">()</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">props</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="s">&#34;mail.imap.port&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">getPort</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">props</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Now create a method (<code>connect()</code>) which sets up the connection:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">jakarta.mail.*</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">com.sun.mail.imap.IMAPFolder</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">Store</span><span class="w"> </span><span class="n">store</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">IMAPFolder</span><span class="w"> </span><span class="n">folder</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">connect</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Session</span><span class="w"> </span><span class="n">session</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Session</span><span class="p">.</span><span class="na">getInstance</span><span class="p">(</span><span class="n">getSessionProperties</span><span class="p">(),</span><span class="w"> </span><span class="kc">null</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">store</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">session</span><span class="p">.</span><span class="na">getStore</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">store</span><span class="p">.</span><span class="na">connect</span><span class="p">(</span><span class="n">options</span><span class="p">.</span><span class="na">getUser</span><span class="p">(),</span><span class="w"> </span><span class="n">options</span><span class="p">.</span><span class="na">getPassword</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Folder</span><span class="w"> </span><span class="n">genericFolder</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">store</span><span class="p">.</span><span class="na">getFolder</span><span class="p">(</span><span class="s">&#34;INBOX&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">folder</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">(</span><span class="n">IMAPFolder</span><span class="p">)</span><span class="w"> </span><span class="n">genericFolder</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="o">!</span><span class="n">folder</span><span class="p">.</span><span class="na">isOpen</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">folder</span><span class="p">.</span><span class="na">open</span><span class="p">(</span><span class="n">Folder</span><span class="p">.</span><span class="na">READ_ONLY</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>You can now use this method to connect to the mail server when the source is created. Create a loop to keep the source running while collecting email counts. Lastly, implement methods to cancel and close the connection:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">jakarta.mail.*</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.SourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="kd">volatile</span><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="n">running</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">false</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">run</span><span class="p">(</span><span class="n">SourceFunction</span><span class="p">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">connect</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">running</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">true</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// TODO: Listen for new messages</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">running</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Trigger some IMAP request to force the server to send a notification</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">folder</span><span class="p">.</span><span class="na">getMessageCount</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Thread</span><span class="p">.</span><span class="na">sleep</span><span class="p">(</span><span class="n">250</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">cancel</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">running</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="kc">false</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">close</span><span class="p">()</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">folder</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">folder</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">store</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">store</span><span class="p">.</span><span class="na">close</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>There is a request trigger to the server in every loop iteration. This is crucial as it ensures that the server will keep sending notifications. A more sophisticated approach would be to make use of the IDLE protocol.</p>
<div class="note">
<h5>Note</h5>
<p>Since the source is not checkpointable, no state fault tolerance will be possible.</p>
</div>
<h2 id="collect-incoming-emails">
Collect incoming emails
<a class="anchor" href="#collect-incoming-emails">#</a>
</h2>
<p>Now you need to listen for new emails arriving in the inbox folder and collect them. To begin, hardcode the schema and only return the email’s subject. Fortunately, Jakarta provides a simple hook (<code>addMessageCountListener()</code>) to get notified when new messages arrive on the server. You can use this in place of the “TODO” comment above:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">jakarta.mail.*</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">jakarta.mail.event.MessageCountAdapter</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">jakarta.mail.event.MessageCountEvent</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.streaming.api.functions.source.RichSourceFunction</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.GenericRowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.StringData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">run</span><span class="p">(</span><span class="n">SourceFunction</span><span class="p">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">folder</span><span class="p">.</span><span class="na">addMessageCountListener</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">MessageCountAdapter</span><span class="p">()</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">messagesAdded</span><span class="p">(</span><span class="n">MessageCountEvent</span><span class="w"> </span><span class="n">e</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">collectMessages</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">e</span><span class="p">.</span><span class="na">getMessages</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">});</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// …</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">collectMessages</span><span class="p">(</span><span class="n">SourceFunction</span><span class="p">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Message</span><span class="o">[]</span><span class="w"> </span><span class="n">messages</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Message</span><span class="w"> </span><span class="n">message</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">messages</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">GenericRowData</span><span class="p">.</span><span class="na">of</span><span class="p">(</span><span class="n">StringData</span><span class="p">.</span><span class="na">fromString</span><span class="p">(</span><span class="n">message</span><span class="p">.</span><span class="na">getSubject</span><span class="p">())));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">MessagingException</span><span class="w"> </span><span class="n">ignored</span><span class="p">)</span><span class="w"> </span><span class="p">{}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Now build the project again and start up the SQL client:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">cd</span> testing/
</span></span><span class="line"><span class="cl">$ ./build_and_run.sh
</span></span></code></pre></div><p>This time, you will connect to a <a href="https://greenmail-mail-test.github.io/greenmail/">GreenMail server</a> which is started as part of the <a href="https://github.com/Airblader/blog-imap/blob/master/testing/docker-compose.yaml">setup</a>:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;host&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;greenmail&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;port&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;3143&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;user&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>The query above should now run continuously but no rows will be produced since it is a test server. You need to first send an email to the server. If you have <a href="https://pubs.opengroup.org/onlinepubs/9699919799/utilities/mailx.html">mailx</a> installed, you can do so by executing in your terminal:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sh" data-lang="sh"><span class="line"><span class="cl">$ <span class="nb">echo</span> <span class="s2">&#34;This is the email body&#34;</span> <span class="p">|</span> mailx -Sv15-compat <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> -s<span class="s2">&#34;Email Subject&#34;</span> <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> -Smta<span class="o">=</span><span class="s2">&#34;smtp://alice:alice@localhost:3025&#34;</span> <span class="se">\
</span></span></span><span class="line"><span class="cl"><span class="se"></span> alice@acme.org
</span></span></code></pre></div><p>The row “Email Subject” should now have appeared as a row in your output. Your source connector is working!</p>
<p>However, since you are still hard-coding the schema produced by the source, defining the table with a different schema will produce errors. You want to be able to define which fields of an email interest you and then produce the data accordingly. To do this, you will use the list of column names from earlier and then look at it when you collect the emails.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.GenericRowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.RowData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kn">import</span><span class="w"> </span><span class="nn">org.apache.flink.table.data.TimestampData</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">ImapSource</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">RichSourceFunction</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">collectMessages</span><span class="p">(</span><span class="n">SourceFunction</span><span class="p">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Message</span><span class="o">[]</span><span class="w"> </span><span class="n">messages</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Message</span><span class="w"> </span><span class="n">message</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">messages</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">collectMessage</span><span class="p">(</span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">message</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">MessagingException</span><span class="w"> </span><span class="n">ignored</span><span class="p">)</span><span class="w"> </span><span class="p">{}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">collectMessage</span><span class="p">(</span><span class="n">SourceFunction</span><span class="p">.</span><span class="na">SourceContext</span><span class="o">&lt;</span><span class="n">RowData</span><span class="o">&gt;</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Message</span><span class="w"> </span><span class="n">message</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">MessagingException</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">GenericRowData</span><span class="w"> </span><span class="n">row</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">GenericRowData</span><span class="p">(</span><span class="n">columnNames</span><span class="p">.</span><span class="na">size</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="kt">int</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">0</span><span class="p">;</span><span class="w"> </span><span class="n">i</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">columnNames</span><span class="p">.</span><span class="na">size</span><span class="p">();</span><span class="w"> </span><span class="n">i</span><span class="o">++</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">switch</span><span class="w"> </span><span class="p">(</span><span class="n">columnNames</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">i</span><span class="p">))</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">case</span><span class="w"> </span><span class="s">&#34;SUBJECT&#34;</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">row</span><span class="p">.</span><span class="na">setField</span><span class="p">(</span><span class="n">i</span><span class="p">,</span><span class="w"> </span><span class="n">StringData</span><span class="p">.</span><span class="na">fromString</span><span class="p">(</span><span class="n">message</span><span class="p">.</span><span class="na">getSubject</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">break</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">case</span><span class="w"> </span><span class="s">&#34;SENT&#34;</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">row</span><span class="p">.</span><span class="na">setField</span><span class="p">(</span><span class="n">i</span><span class="p">,</span><span class="w"> </span><span class="n">TimestampData</span><span class="p">.</span><span class="na">fromInstant</span><span class="p">(</span><span class="n">message</span><span class="p">.</span><span class="na">getSentDate</span><span class="p">().</span><span class="na">toInstant</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">break</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">case</span><span class="w"> </span><span class="s">&#34;RECEIVED&#34;</span><span class="p">:</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">row</span><span class="p">.</span><span class="na">setField</span><span class="p">(</span><span class="n">i</span><span class="p">,</span><span class="w"> </span><span class="n">TimestampData</span><span class="p">.</span><span class="na">fromInstant</span><span class="p">(</span><span class="n">message</span><span class="p">.</span><span class="na">getReceivedDate</span><span class="p">().</span><span class="na">toInstant</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">break</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// ...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="n">row</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>You should now have a working source where you can select any of the columns that are supported. Try it out again in the SQL client, but this time specifying all the columns (&ldquo;subject&rdquo;, &ldquo;sent&rdquo;, &ldquo;received&rdquo;) supported above:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-sql" data-lang="sql"><span class="line"><span class="cl"><span class="k">CREATE</span><span class="w"> </span><span class="k">TABLE</span><span class="w"> </span><span class="n">T</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">subject</span><span class="w"> </span><span class="n">STRING</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">sent</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">received</span><span class="w"> </span><span class="k">TIMESTAMP</span><span class="p">(</span><span class="mi">3</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">)</span><span class="w"> </span><span class="k">WITH</span><span class="w"> </span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;connector&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;imap&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;host&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;greenmail&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;port&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;3143&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;user&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="s1">&#39;password&#39;</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">&#39;alice&#39;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">SELECT</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="k">FROM</span><span class="w"> </span><span class="n">T</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><p>Use the <code>mailx</code> command from earlier to send emails to the GreenMail server and you should see them appear. You can also try selecting only some of the columns, or write more complex queries.</p>
<h1 id="test-the-connector-with-a-real-mail-server-on-the-ververica-platform">
Test the connector with a real mail server on the Ververica Platform
<a class="anchor" href="#test-the-connector-with-a-real-mail-server-on-the-ververica-platform">#</a>
</h1>
<p>If you want to test the connector with a real mail server, you can import it into <a href="https://www.ververica.com/getting-started">Ververica Platform Community Edition</a>. To begin, make sure that you have the Ververica Platform up and running.</p>
<p>Since the example connector in this blog post is still a bit limited, you will use the finished connector in this <a href="github.com/TNG/flink-connector-email">repository</a> instead. You can clone that repository and build it the same way to obtain the JAR file.</p>
<p>For this example, let&rsquo;s connect to a Gmail account. This requires SSL and comes with an additional caveat that you need to enable two-factor authentication and create an application password to use instead of your real password.</p>
<p>First, head to SQL → Connectors. There you can create a new connector by uploading your JAR file. The platform will detect the connector options automatically. Afterwards, go back to the SQL Editor and you should now be able to use the connector.</p>
<div>
<img src="/img/blog/2021-09-07-connector-table-sql-api/VVP-SQL-Editor.png" alt="Ververica Platform - SQL Editor"/>
<p class="align-center">Ververica Platform - SQL Editor</p>
</div>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>Apache Flink is designed for easy extensibility and allows users to access many different external systems as data sources or sinks through a versatile set of connectors. It can read and write data from databases, local and distributed file systems.</p>
<p>Flink also exposes APIs on top of which custom connectors can be built. In this two-part blog series, you explored some of these APIs and concepts and learned how to implement your own custom source connector that can read in data from an email inbox. You then used Flink to process incoming emails through the IMAP protocol and wrote some Flink SQL.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2021-09-07-connector-table-sql-api-part2.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li><a href="#goals">Goals</a></li>
<li><a href="#prerequisites">Prerequisites</a></li>
<li><a href="#understand-how-to-fetch-emails-via-the-imap-protocol">Understand how to fetch emails via the IMAP protocol</a></li>
<li><a href="#add-configuration-options---server-information-and-credentials">Add configuration options - server information and credentials</a></li>
<li><a href="#connect-to-the-source-email-server">Connect to the source email server</a>
<ul>
<li><a href="#collect-incoming-emails">Collect incoming emails</a></li>
</ul>
</li>
<li><a href="#test-the-connector-with-a-real-mail-server-on-the-ververica-platform">Test the connector with a real mail server on the Ververica Platform</a></li>
<li><a href="#summary">Summary</a></li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>