blob: 495b32a2a01633c74cee1cb371886bbe36db6d53 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2023/05/03/howto-create-a-batch-source-with-the-new-source-framework/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Introduction # The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!
Implementing the source components # The source architecture is depicted in the diagrams below:">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Howto create a batch source with the new Source framework" />
<meta property="og:description" content="Introduction # The Flink community has designed a new Source framework based on FLIP-27 lately. Some connectors have migrated to this new framework. This article is a how-to for creating a batch source using this new framework. It was built while implementing the Flink batch source for Cassandra. If you are interested in contributing or migrating connectors, this blog post is for you!
Implementing the source components # The source architecture is depicted in the diagrams below:" />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2023/05/03/howto-create-a-batch-source-with-the-new-source-framework/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2023-05-03T08:00:00+00:00" />
<meta property="article:modified_time" content="2023-05-03T08:00:00+00:00" />
<title>Howto create a batch source with the new Source framework | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2023/05/03/howto-create-a-batch-source-with-the-new-source-framework/">Howto create a batch source with the new Source framework</a>
</h1>
May 3, 2023 -
Etienne Chauchot
<a href="https://twitter.com/echauchot">(@echauchot)</a>
<p><h2 id="introduction">
Introduction
<a class="anchor" href="#introduction">#</a>
</h2>
<p>The Flink community has
designed <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/">a new Source framework</a>
based
on <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A&#43;Refactor&#43;Source&#43;Interface">FLIP-27</a>
lately. Some connectors have migrated to this new framework. This article is a how-to for creating a
batch
source using this new framework. It was built while implementing
the <a href="https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca">Flink batch source</a>
for <a href="https://cassandra.apache.org/_/index.html">Cassandra</a>.
If you are interested in contributing or migrating connectors, this blog post is for you!</p>
<h2 id="implementing-the-source-components">
Implementing the source components
<a class="anchor" href="#implementing-the-source-components">#</a>
</h2>
<p>The source architecture is depicted in the diagrams below:</p>
<p><img src="/img/blog/2023-05-03-howto-create-batch-source/source_components.svg" alt="" /></p>
<p><img src="/img/blog/2023-05-03-howto-create-batch-source/source_reader.svg" alt="" /></p>
<h3 id="source">
Source
<a class="anchor" href="#source">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/CassandraSource.java">Example Cassandra Source</a></p>
<p>The source interface only does the &ldquo;glue&rdquo; between all the other components. Its role is to
instantiate all of them and to define the
source <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/Boundedness.html">Boundedness</a>
. We also do the source configuration
here along with user configuration validation.</p>
<h3 id="sourcereader">
SourceReader
<a class="anchor" href="#sourcereader">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSourceReader.java">Example Cassandra SourceReader</a></p>
<p>As shown in the graphic above, the instances of
the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html">SourceReader</a>
(which we will call simply readers
in the continuation of this article) run in parallel in task managers to read the actual data which
is divided into <a href="#split-and-splitstate">Splits</a>. Readers request splits from
the <a href="#splitenumerator-and-splitenumeratorstate">SplitEnumerator</a> and the resulting splits are
assigned to them in return.</p>
<p>Flink provides
the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html">SourceReaderBase</a>
implementation that takes care of all the threading. Flink also provides a useful extension to
this class for most
cases: <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SingleThreadMultiplexSourceReaderBase.html">SingleThreadMultiplexSourceReaderBase</a>
. This class has the threading model already configured:
each <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html">SplitReader</a>
instance reads splits using one thread (but there are several SplitReader instances that live among
task
managers).</p>
<p>What we have left to do in the SourceReader class is:</p>
<ul>
<li>Provide a <a href="#splitreader">SplitReader</a> supplier</li>
<li>Create a <a href="#recordemitter">RecordEmitter</a></li>
<li>Create the shared resources for the SplitReaders (sessions, etc&hellip;). As the SplitReader supplier
is
created in the SourceReader constructor in a super() call, using a SourceReader factory to create
the shared resources and pass them to the supplier is a good idea.</li>
<li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceReader.html#start--">start()</a>:
here we should ask the enumerator for our first split</li>
<li>Override <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#close--">close()</a>
in SourceReaderBase parent class to free up any created resources (the shared
resources for example)</li>
<li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#initializedState-SplitT-">initializedState()</a>
to create a mutable <a href="#split-and-splitstate">SplitState</a> from a Split</li>
<li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#toSplitType-java.lang.String-SplitStateT-">toSplitType()</a>
to create a Split from the mutable SplitState</li>
<li>Implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html#onSplitFinished-java.util.Map-">onSplitFinished()</a>:
here, as it is a batch source (finite data), we should ask the
Enumerator for next split</li>
</ul>
<h3 id="split-and-splitstate">
Split and SplitState
<a class="anchor" href="#split-and-splitstate">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplit.java">Example Cassandra Split</a></p>
<p>The <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SourceSplit.html">SourceSplit</a>
represents a partition of the source data. What defines a split depends on the
backend we are reading from. It could be a <em>(partition start, partition end)</em> tuple or an <em>(offset,
split size)</em> tuple for example.</p>
<p>In any case, the Split object should be seen as an immutable object: any update to it should be done
on the
associated <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.html">SplitState</a>.
The split state is the one that will be stored inside the Flink
<a href="https://nightlies.apache.org/flink/flink-docs-master/docs/concepts/stateful-stream-processing/#checkpointing">checkpoints</a>
. A checkpoint may happen between 2 fetches for 1 split. So, if we&rsquo;re reading a split, we
must store in the split state the current state of the reading process. This current state needs to
be something serializable (because it will be part of a checkpoint) and something that the backend
source can resume from. That way, in case of failover, the reading could be resumed from where it
was left off. Thus we ensure there will be no duplicates or lost data.<br>
For example, if the records
reading order is deterministic in the backend, then the split state can store the number <em>n</em> of
already read records to restart at <em>n+1</em> after failover.</p>
<h3 id="splitenumerator-and-splitenumeratorstate">
SplitEnumerator and SplitEnumeratorState
<a class="anchor" href="#splitenumerator-and-splitenumeratorstate">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraSplitEnumerator.java">Example Cassandra SplitEnumerator</a>
and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorState.java">SplitEnumeratorState</a></p>
<p>The <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html">SplitEnumerator</a>
is responsible for creating the splits and serving them to the readers. Whenever
possible, it is preferable to generate the splits lazily, meaning that each time a reader asks the
enumerator for a split, the enumerator generates one on demand and assigns it to the reader. For
that we
implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#handleSplitRequest-int-java.lang.String-">SplitEnumerator#handleSplitRequest()</a>
. Lazy splits generation is preferable to
splits discovery, in which we pre-generate all the splits and store them waiting to assign them to
the readers. Indeed, in some situations, the number of splits can be enormous and consume a lot a
memory which could be problematic in case of straggling readers. The framework offers the ability to
act upon reader registration by
implementing <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addReader-int-">addReader()</a>
but, as we do lazy splits generation, we
have nothing to do there. In some cases, generating a split is too costly, so we can pre-generate a
batch (not all) of splits to amortize this cost. The number/size of batched splits need to be taken
into account to avoid consuming too much memory.</p>
<p>Long story short, the tricky part of the source implementation is splitting the source data. The
good equilibrium to find is not to have too many splits (which could lead to too much memory
consumption) nor too few (which could lead to sub-optimal parallelism). One good way to meet this
equilibrium is to evaluate the size of the source data upfront and allow the user to specify the
maximum memory a split will take. That way they can configure this parameter accordingly to the
memory
available on the task managers. This parameter is optional so the source needs to provide a default
value. Also, the source needs to control that the user provided max-split-size is not too little
which would
lead to too many splits. The general rule of thumb is to let the user some freedom but protect him
from unwanted behavior.
For these safety measures, rigid thresholds
don&rsquo;t work well as the source may start to fail when the thresholds are suddenly exceeded.<br>
For example if we enforce that the number of splits is below twice the parallelism, if
the job is regularly run on a growing table, at some point there will be
more and more splits of max-split-size and the threshold will be exceeded. Of course, the size of
the source data needs to be evaluated without
reading the actual data. For the Cassandra connector it was
done <a href="https://echauchot.blogspot.com/2023/03/cassandra-evaluate-table-size-without.html">like this</a>.</p>
<p>Another important topic is state. If the job manager fails, the split enumerator needs to recover.
For that, as for the split, we need to provide a state for the enumerator that will be part of a
checkpoint. Upon recovery, the enumerator is reconstructed and
receives <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html">an enumerator state</a>
for recovering its previous state. Upon checkpointing, the
enumerator returns its state when <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#snapshotState-long-">SplitEnumerator#snapshotState()</a>
is called. The state
must contain everything needed to resume where the enumerator was left off after failover. In lazy
split generation scenario, the state will contain everything needed to generate the next split
whenever asked to. It can be for example the start offset of next split, split size, number of
splits still to generate etc&hellip; But the SplitEnumeratorState must also contain a list of splits, not
the list of discovered splits, but a list of splits to reassign. Indeed, whenever a reader fails, if
it was assigned splits after last checkpoint, then the checkpoint will not contain those splits.
Consequently, upon restoration, the reader won&rsquo;t have the splits assigned anymore. There is a
callback to deal with that
case: <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#addSplitsBack-java.util.List-int-">addSplitsBack()</a>
. There, the splits that were assigned to the
failing reader, can be put back into the enumerator state for later re-assignment to readers. There
is no memory size risk here as the number of splits to reassign is pretty low.</p>
<p>The above topics are the more important regarding splitting. There are 2 methods left to implement:
the
usual <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#start--">start()</a>
/<a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumerator.html#close--">close()</a>
methods for resources creation/disposal. Regarding implementing start(),
the Flink connector framework
provides <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.html#callAsync-java.util.concurrent.Callable-java.util.function.BiConsumer-long-long-">enumeratorContext#callAsync()</a>
utility to run long processing
asynchronously such as splits preparation or splits discovery (if lazy splits generation is
impossible). Indeed, the start() method runs in the source coordinator thread,
we don&rsquo;t want to block it for a long time.</p>
<h3 id="splitreader">
SplitReader
<a class="anchor" href="#splitreader">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraSplitReader.java">Example Cassandra SplitReader</a></p>
<p>This class is responsible for reading the actual splits that it receives when the framework
calls <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#handleSplitsChanges-org.apache.flink.connector.base.source.reader.splitreader.SplitsChange-">handleSplitsChanges()</a>
. The main part of the split reader is
the <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#fetch--">fetch()</a>
implementation where we read all the splits received and return the read records as
a <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/RecordsBySplits.html">RecordsBySplits</a>
object. This object contains a map of the split ids to the belonging records and also the ids of the
finished splits. Important points need to be considered:</p>
<ul>
<li>The fetch call must be non-blocking. If any call in its code is synchronous and potentially long,
an
escape from the fetch() must be provided. When the framework
calls <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html#wakeUp--">wakeUp()</a>
we should interrupt the
fetch for example by setting an AtomicBoolean.</li>
<li>Fetch call needs to be re-entrant: an already read split must not be re-read. We should remove it
from the list of splits to read and add its id to the finished splits (along with empty splits) in
the RecordsBySplits that we return.</li>
</ul>
<p>It is totally fine for the implementer to exit the fetch() method early. Also a failure could
interrupt the fetch. In both cases the framework will call fetch() again later on. In that case, the
fetch method must resume the reading from where it was left off using the split state already
discussed. If resuming the read of a split is impossible because of backend constraints, then the
only solution is to read splits atomically (either not read the split at all, or read it entirely).
That way, in case of interrupted fetch, nothing will be output and the split could be read again
from the beginning at next fetch call leading to no duplicates. But if the split is read entirely,
there are points to consider:</p>
<ul>
<li>We should ensure that the total split content (records from the source) fits in memory for example
by specifying a max split size in bytes (
see <a href="#splitenumerator-and-splitenumeratorstate">SplitEnumarator</a>)</li>
<li>The split state becomes useless, only a Split class is needed</li>
</ul>
<h3 id="recordemitter">
RecordEmitter
<a class="anchor" href="#recordemitter">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/reader/CassandraRecordEmitter.java">Example Cassandra RecordEmitter</a></p>
<p>The SplitReader reads records in the form
of <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/splitreader/SplitReader.html">an intermediary record format</a>
that the implementer
provides for each record. It can be the raw format returned by the backend or any format allowing to
extract the actual record afterwards. This format is not the final output format expected by the
source. It contains anything needed to do the conversion to the record output format. We need to
implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/base/source/reader/RecordEmitter.html#emitRecord-E-org.apache.flink.api.connector.source.SourceOutput-SplitStateT-">RecordEmitter#emitRecord()</a>
to do this conversion. A good pattern here is to initialize the
RecordEmitter with a mapping Function. The implementation must be idempotent. Indeed the method
maybe interrupted in the middle. In that case, the same set of records will be passed to the record
emitter again later.</p>
<h3 id="serializers">
Serializers
<a class="anchor" href="#serializers">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java">Example Cassandra SplitSerializer</a>
and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java">SplitEnumeratorStateSerializer</a></p>
<p>We need to provide singleton serializers for:</p>
<ul>
<li>Split: splits are serialized when sending them from enumerator to reader, and when checkpointing
the reader&rsquo;s current state</li>
<li>SplitEnumeratorState: the serializer is used for the result of the
SplitEnumerator#snapshotState()</li>
</ul>
<p>For both, we need to
implement <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/core/io/SimpleVersionedSerializer.html">SimpleVersionedSerializer</a>
. Care needs to be taken at some important points:</p>
<ul>
<li>Using Java serialization
is <a href="https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization">forbidden</a>
in Flink mainly for migration concerns. We should rather manually write the fields of the objects
using ObjectOutputStream. When a class is not supported by the ObjectOutputStream (not String,
Integer, Long&hellip;), we should write the size of the object in bytes as an Integer and then write
the object converted to byte[]. Similar method is used to serialize collections. First write the
number of elements of the collection, then serialize all the contained objects. Of course, for
deserialization we do the exact same reading with the same order.</li>
<li>There can be a lot of splits, so we should cache the OutputStream used in SplitSerializer. We can
do so by using.</li>
</ul>
<p><code> ThreadLocal&lt;DataOutputSerializer&gt; SERIALIZER_CACHE = ThreadLocal.withInitial(() -&gt; new DataOutputSerializer(64));</code></p>
<p>The initial stream size depends on the size of a split.</p>
<h2 id="testing-the-source">
Testing the source
<a class="anchor" href="#testing-the-source">#</a>
</h2>
<p>For the sake of concision of this article, testing the source will be the object of the next
article. Stay tuned !</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>This article gathering the implementation field feedback was needed as the javadocs cannot cover all
the implementation details for high-performance and maintainable sources. I hope you enjoyed reading
and that it gave you the desire to contribute a new connector to the Flink project !</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2023-05-03-howto-create-batch-source.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#implementing-the-source-components">Implementing the source components</a>
<ul>
<li><a href="#source">Source</a></li>
<li><a href="#sourcereader">SourceReader</a></li>
<li><a href="#split-and-splitstate">Split and SplitState</a></li>
<li><a href="#splitenumerator-and-splitenumeratorstate">SplitEnumerator and SplitEnumeratorState</a></li>
<li><a href="#splitreader">SplitReader</a></li>
<li><a href="#recordemitter">RecordEmitter</a></li>
<li><a href="#serializers">Serializers</a></li>
</ul>
</li>
<li><a href="#testing-the-source">Testing the source</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>