blob: 5bf77fa314e5c94b15ef9b0ee613d108d9294eed [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2023/05/12/howto-test-a-batch-source-with-the-new-source-framework/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Introduction # The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article . Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.
Unit testing the source # Testing the serializers # Example Cassandra SplitSerializer and SplitEnumeratorStateSerializer">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Howto test a batch source with the new Source framework" />
<meta property="og:description" content="Introduction # The Flink community has designed a new Source framework based on FLIP-27 lately. This article is the continuation of the howto create a batch source with the new Source framework article . Now it is time to test the created source ! As the previous article, this one was built while implementing the Flink batch source for Cassandra.
Unit testing the source # Testing the serializers # Example Cassandra SplitSerializer and SplitEnumeratorStateSerializer" />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2023/05/12/howto-test-a-batch-source-with-the-new-source-framework/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2023-05-12T08:00:00+00:00" />
<meta property="article:modified_time" content="2023-05-12T08:00:00+00:00" />
<title>Howto test a batch source with the new Source framework | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2023/05/12/howto-test-a-batch-source-with-the-new-source-framework/">Howto test a batch source with the new Source framework</a>
</h1>
May 12, 2023 -
Etienne Chauchot
<a href="https://twitter.com/echauchot">(@echauchot)</a>
<p><h2 id="introduction">
Introduction
<a class="anchor" href="#introduction">#</a>
</h2>
<p>The Flink community has
designed <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/sources/">a new Source framework</a>
based
on <a href="https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A&#43;Refactor&#43;Source&#43;Interface">FLIP-27</a>
lately. This article is the
continuation of
the <a href="https://flink.apache.org/2023/04/14/howto-create-batch-source/">howto create a batch source with the new Source framework article</a>
. Now it is
time to test the created source ! As the previous article, this one was built while implementing the
<a href="https://github.com/apache/flink-connector-cassandra/commit/72e3bef1fb9ee6042955b5e9871a9f70a8837cca">Flink batch source</a>
for <a href="https://cassandra.apache.org/_/index.html">Cassandra</a>.</p>
<h2 id="unit-testing-the-source">
Unit testing the source
<a class="anchor" href="#unit-testing-the-source">#</a>
</h2>
<h3 id="testing-the-serializers">
Testing the serializers
<a class="anchor" href="#testing-the-serializers">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/split/CassandraSplitSerializer.java">Example Cassandra SplitSerializer</a>
and <a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/main/java/org/apache/flink/connector/cassandra/source/enumerator/CassandraEnumeratorStateSerializer.java">SplitEnumeratorStateSerializer</a></p>
<p>In the previous article, we
created <a href="https://flink.apache.org/2023/04/14/howto-create-batch-source/#serializers">serializers</a>
for Split and SplitEnumeratorState. We should now test them in unit tests. To test serde
we create an object, serialize it using the serializer and then deserialize it using the same
serializer and finally assert on the equality of the two objects. Thus, hascode() and equals() need
to be implemented for the serialized objects.</p>
<h3 id="other-unit-tests">
Other unit tests
<a class="anchor" href="#other-unit-tests">#</a>
</h3>
<p>Of course, we also need to unit test low level processing such as query building for example or any
processing that does not require a running backend.</p>
<h2 id="integration-testing-the-source">
Integration testing the source
<a class="anchor" href="#integration-testing-the-source">#</a>
</h2>
<p>For tests that require a running backend, Flink provides a JUnit5 source test framework. It is composed of different parts gathered in a test suite:</p>
<ul>
<li><a href="#flink-environment">The Flink environment</a></li>
<li><a href="#backend-environment">The backend environment</a></li>
<li><a href="#checkpointing-semantics">The checkpointing semantics</a></li>
<li><a href="#test-context">The test context</a></li>
</ul>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraSourceITCase.java">Example Cassandra SourceITCase
</a></p>
<p>For the test to be integrated to Flink CI, the test class must be called *ITCAse. But it can be called
differently if the test belongs to somewhere else.
The class extends <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html">SourceTestSuiteBase</a>
. This test suite provides all
the necessary tests already (single split, multiple splits, idle reader, etc&hellip;). It is targeted for
batch and streaming sources, so for our batch source case here, the tests below need to be disabled
as they are targeted for streaming sources. They can be disabled by overriding them in the ITCase
and annotating them with <code>@Disabled</code>:</p>
<ul>
<li>testSourceMetrics</li>
<li>testSavepoint</li>
<li>testScaleUp</li>
<li>testScaleDown</li>
<li>testTaskManagerFailure</li>
</ul>
<p>Of course we can add our own integration tests cases for example tests on limits, tests on low level
splitting or any test that requires a running backend. But for most cases we only need to provide
Flink test environment classes to configure the ITCase:</p>
<h3 id="flink-environment">
Flink environment
<a class="anchor" href="#flink-environment">#</a>
</h3>
<p>We add this annotated field to our ITCase and we&rsquo;re done</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@TestEnv</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">MiniClusterTestEnvironment</span><span class="w"> </span><span class="n">flinkTestEnvironment</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">MiniClusterTestEnvironment</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><h3 id="backend-environment">
Backend environment
<a class="anchor" href="#backend-environment">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestEnvironment.java">Example Cassandra TestEnvironment</a></p>
<p>To test the connector we need a backend to run the connector against. This TestEnvironment
provides everything related to the backend: the container, its configuration, the session to connect to it,
and all the elements bound to the whole test case (table space, initialization requests &hellip;)</p>
<p>We add this annotated field to our ITCase</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@TestExternalSystem</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">MyBackendTestEnvironment</span><span class="w"> </span><span class="n">backendTestEnvironment</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">MyBackendTestEnvironment</span><span class="p">();</span><span class="w">
</span></span></span></code></pre></div><p>To integrate with JUnit5 BackendTestEnvironment
implements <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html">TestResource</a>
. This environment is scoped to the test suite, so it is where we setup the backend and shared resources (session, tablespace, etc&hellip;) by
implementing <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#startUp--">startup()</a>
and <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/TestResource.html#tearDown--">tearDown()</a>
methods. For
that we advise the use of <a href="https://www.testcontainers.org/">testContainers</a> that relies on docker
images to provide a real backend
instance (not a mock) that is representative for integration tests. Several backends are supported
out of the box by testContainers. We need to configure test containers that way:</p>
<ul>
<li>Redirect container output (error and standard output) to Flink logs</li>
<li>Set the different timeouts to cope with CI server load</li>
<li>Set retrial mechanisms for connection, initialization requests etc&hellip; for the same reason</li>
</ul>
<h3 id="checkpointing-semantics">
Checkpointing semantics
<a class="anchor" href="#checkpointing-semantics">#</a>
</h3>
<p>In big data execution engines, there are 2 levels of guarantee regarding source and sinks:</p>
<ul>
<li>At least once: upon failure and recovery, some records may be reflected multiple times but none
will
be lost</li>
<li>Exactly once: upon failure and recovery, every record will be reflected exactly once</li>
</ul>
<p>By the following code we verify that the source supports exactly once semantics:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@TestSemantics</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">CheckpointingMode</span><span class="o">[]</span><span class="w"> </span><span class="n">semantics</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">CheckpointingMode</span><span class="o">[]</span><span class="w"> </span><span class="p">{</span><span class="n">CheckpointingMode</span><span class="p">.</span><span class="na">EXACTLY_ONCE</span><span class="p">};</span><span class="w">
</span></span></span></code></pre></div><p>That being said, we could encounter a problem while running the tests : the default assertions in
the Flink source test framework assume that the data is read in the same order it was written. This
is untrue for most big data backends where ordering is usually not deterministic. To support
unordered checks and still use all the framework provided tests, we need to override
<a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.html#checkResultWithSemantic-org.apache.flink.util.CloseableIterator-java.util.List-org.apache.flink.streaming.api.CheckpointingMode-java.lang.Integer-">SourceTestSuiteBase#checkResultWithSemantic</a>
in out ITCase:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">protected</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">checkResultWithSemantic</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">CloseableIterator</span><span class="o">&lt;</span><span class="n">Pojo</span><span class="o">&gt;</span><span class="w"> </span><span class="n">resultIterator</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Pojo</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">testData</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">CheckpointingMode</span><span class="w"> </span><span class="n">semantic</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Integer</span><span class="w"> </span><span class="n">limit</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">limit</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Runnable</span><span class="w"> </span><span class="n">runnable</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">()</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">CollectIteratorAssertions</span><span class="p">.</span><span class="na">assertUnordered</span><span class="p">(</span><span class="n">resultIterator</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">withNumRecordsLimit</span><span class="p">(</span><span class="n">limit</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">matchesRecordsFromSource</span><span class="p">(</span><span class="n">testData</span><span class="p">,</span><span class="w"> </span><span class="n">semantic</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">assertThat</span><span class="p">(</span><span class="n">runAsync</span><span class="p">(</span><span class="n">runnable</span><span class="p">)).</span><span class="na">succeedsWithin</span><span class="p">(</span><span class="n">DEFAULT_COLLECT_DATA_TIMEOUT</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">else</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">CollectIteratorAssertions</span><span class="p">.</span><span class="na">assertUnordered</span><span class="p">(</span><span class="n">resultIterator</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">matchesRecordsFromSource</span><span class="p">(</span><span class="n">testData</span><span class="p">,</span><span class="w"> </span><span class="n">semantic</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>This is a copy-paste of the parent method where <em>CollectIteratorAssertions.assertOrdered()</em>
is
replaced by <em>CollectIteratorAssertions.assertUnordered()</em>.</p>
<h3 id="test-context">
Test context
<a class="anchor" href="#test-context">#</a>
</h3>
<p><a href="https://github.com/apache/flink-connector-cassandra/blob/d92dc8d891098a9ca6a7de6062b4630079beaaef/flink-connector-cassandra/src/test/java/org/apache/flink/connector/cassandra/source/CassandraTestContext.java">Example Cassandra TestContext</a></p>
<p>The test context provides Flink with means to interact with the backend, like inserting test
data, creating tables or constructing the source. It is scoped to the test case (and not to the test
suite).</p>
<p>It is linked to the ITCase through a factory of TestContext as shown below.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@TestContext</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">TestContextFactory</span><span class="w"> </span><span class="n">contextFactory</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">TestContextFactory</span><span class="p">(</span><span class="n">testEnvironment</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>TestContext implements <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html">DataStreamSourceExternalContext</a>:</p>
<ul>
<li>We don&rsquo;t connect to the backend at each test case, so the shared resources such as session are
created by the backend test environment (test suite scoped). They are then passed to the test
context by constructor. It is also in the constructor that we initialize test case backend
resources such as test case table.</li>
<li>close() : drop the created test case resources</li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/api/java/typeutils/ResultTypeQueryable.html#getProducedType--">getProducedType()</a>:
specify the test output type of the source such as a test Pojo for example</li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalContext.html#getConnectorJarPaths--">getConnectorJarPaths()</a>:
provide a list of attached jars. Most of the time, we can return an empty
list as maven already adds the jars to the test classpath</li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSource-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-">createSource()</a>:
here we create the source as a user would have done. It will be provided to the
test cases by the Flink test framework</li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#createSourceSplitDataWriter-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-">createSourceSplitDataWriter()</a>:
here we create
an <a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/ExternalSystemSplitDataWriter.html">ExternalSystemSplitDataWriter</a>
responsible for
writing test data which comes as a list of produced type objects such as defined in
getProducedType()</li>
<li><a href="https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/testframe/external/source/DataStreamSourceExternalContext.html#generateTestData-org.apache.flink.connector.testframe.external.source.TestingSourceSettings-int-long-">generateTestData()</a>:
produce the list of test data that will be given to the
ExternalSystemSplitDataWriter. We must make sure that equals() returns false when 2 records of
this list belong to different splits. The easier for that is to include the split id into the
produced type and make sure equals() and hashcode() are properly implemented to include this
field.</li>
</ul>
<h2 id="contributing-the-source-to-flink">
Contributing the source to Flink
<a class="anchor" href="#contributing-the-source-to-flink">#</a>
</h2>
<p>Lately, the Flink community has externalized all the connectors to external repositories that are
sub-repositories of the official Apache Flink repository. This is mainly to decouple the release of
Flink to the release of the connectors. To distribute the created source, we need to
follow <a href="https://cwiki.apache.org/confluence/display/FLINK/Externalized&#43;Connector&#43;development">this official wiki page</a>
.</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>This concludes the series of articles about creating a batch source with the new Flink framework.
This was needed as, apart from the javadocs, the documentation about testing is missing for now. I
hope you enjoyed reading and I hope the Flink community will receive a source PR from you soon :)</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2023-05-12-howto-test-batch-source.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#unit-testing-the-source">Unit testing the source</a>
<ul>
<li><a href="#testing-the-serializers">Testing the serializers</a></li>
<li><a href="#other-unit-tests">Other unit tests</a></li>
</ul>
</li>
<li><a href="#integration-testing-the-source">Integration testing the source</a>
<ul>
<li><a href="#flink-environment">Flink environment</a></li>
<li><a href="#backend-environment">Backend environment</a></li>
<li><a href="#checkpointing-semantics">Checkpointing semantics</a></li>
<li><a href="#test-context">Test context</a></li>
</ul>
</li>
<li><a href="#contributing-the-source-to-flink">Contributing the source to Flink</a></li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>