blob: 3c817620637a8bf326f3ef39c99e8da3f670ffbe [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=>
<head>
<meta name="generator" content="Hugo 0.92.2" />
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Writing unit tests is one of the essential tasks of designing a production-grade application. Without tests, a single change in code can result in cascades of failure in production. Thus unit tests should be written for all types of applications, be it a simple job cleaning data and training a model or a complex multi-tenant, real-time data processing system. In the following sections, we provide a guide for unit testing of Apache Flink applications.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="A Guide for Unit Testing in Apache Flink" />
<meta property="og:description" content="Writing unit tests is one of the essential tasks of designing a production-grade application. Without tests, a single change in code can result in cascades of failure in production. Thus unit tests should be written for all types of applications, be it a simple job cleaning data and training a model or a complex multi-tenant, real-time data processing system. In the following sections, we provide a guide for unit testing of Apache Flink applications." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-02-03T12:00:00+00:00" />
<meta property="article:modified_time" content="2020-02-03T12:00:00+00:00" />
<title>A Guide for Unit Testing in Apache Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.e3b33391dbc1f4b2cc47778e2f4b86c744ded3ccc82fdfb6f08caf91d8607f9a.css" integrity="sha256-47MzkdvB9LLMR3eOL0uGx0Te08zIL9&#43;28Iyvkdhgf5o=">
<script defer src="/en.search.min.069123ef70eb5a313a1a8ba877092ebbe3e8ff1540b613f6c3c0bdd7a30c0733.js" integrity="sha256-BpEj73DrWjE6Gouodwkuu&#43;Po/xVAthP2w8C916MMBzM="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/02/03/a-guide-for-unit-testing-in-apache-flink/">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=>
<input type="checkbox" class="hidden toggle" id="menu-control" />
<input type="checkbox" class="hidden toggle" id="toc-control" />
<main class="container flex">
<aside class="book-menu">
<nav>
<a id="logo" href="/">
<img width="70%" src="/flink-header-logo.svg">
</a>
<div class="book-search">
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/" />
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
<input type="checkbox" id="section-4117fb24454a2c30ee86e524839e77ec" class="toggle" />
<label for="section-4117fb24454a2c30ee86e524839e77ec" class="flex justify-between flink-menu-item">What is Apache Flink?<span></span>
</label>
<ul>
<li>
<label for="section-ffd5922da551e96e0481423fab94c463" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-architecture/" class="">Architecture</a>
</label>
</li>
<li>
<label for="section-fc28f08b67476edb77e00e03b6c7c2e0" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-applications/" class="">Applications</a>
</label>
</li>
<li>
<label for="section-612df33a02d5d4ee78d718abaab5b5b4" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/what-is-flink/flink-operations/" class="">Operations</a>
</label>
</li>
</ul>
<label for="section-f1ecec07350bd6810050d40158878749" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">What is Stateful Functions? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-4113a4c3072cb35f6fd7a0d4e098ee70" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">What is Flink ML? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-b39c70259d0abbe2bf1d8d645425f84d" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">What is the Flink Kubernetes Operator? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-53e0b1afcb9ccaf779dc285aa272a014" class="flex justify-between flink-menu-item">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">What is Flink Table Store? <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-f4973f06a66f063045b4ebdacaf3127d" class="flex justify-between flink-menu-item">
<a href="/use-cases/" class="">Use Cases</a>
</label>
<label for="section-0f1863835376e859ac438ae9529daff2" class="flex justify-between flink-menu-item">
<a href="/powered-by/" class="">Powered By</a>
</label>
<br/>
<label for="section-f383f23a96a43d8d0cc66aeb0237e26a" class="flex justify-between flink-menu-item">
<a href="/downloads/" class="">Downloads</a>
</label>
<input type="checkbox" id="section-c727fab97b4d77e5b28ce8c448fb9000" class="toggle" />
<label for="section-c727fab97b4d77e5b28ce8c448fb9000" class="flex justify-between flink-menu-item">Getting Started<span></span>
</label>
<ul>
<li>
<label for="section-f45abaa99ab076108b9a5b94edbc6647" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/" style="color:black" class="">With Flink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-efe2166e9dce6f72e126dcc2396b4402" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html" style="color:black" class="">With Flink Stateful Functions <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-7e268d0a469b1093bb33d71d093eb7b9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/" style="color:black" class="">With Flink ML <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-cc7147cd0441503127bfaf6f219d4fbb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/" style="color:black" class="">With Flink Kubernetes Operator <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-660ca694e416d8ca9176dda52a60d637" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/docs/try-table-store/quick-start/" style="color:black" class="">With Flink Table Store <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-75db0b47bf4ae9c247aadbba5fbd720d" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/" style="color:black" class="">Training Course <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<input type="checkbox" id="section-6318075fef29529089951a49d413d083" class="toggle" />
<label for="section-6318075fef29529089951a49d413d083" class="flex justify-between flink-menu-item">Documentation<span></span>
</label>
<ul>
<li>
<label for="section-9a8122d8912450484d1c25394ad40229" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-stable/" style="color:black" class="">Flink 1.17 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-8b2fd3efb702be3783ba98d650707e3c" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-docs-master/" style="color:black" class="">Flink Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-5317a079cddb964c59763c27607f43d9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">Stateful Functions 3.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-25b72f108b7156e94d91b04853d8813a" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-statefun-docs-master" style="color:black" class="">Stateful Functions Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-13a02f969904a2455a39ed90e287593f" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">ML 2.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-6d895ec5ad127a29a6a9ce101328ccdf" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-ml-docs-master" style="color:black" class="">ML Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-c83ad0caf34e364bf3729badd233a350" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">Kubernetes Operator 1.4 (latest) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-a2c75d90005425982ba8f26ae0e160a3" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main" style="color:black" class="">Kubernetes Operator Main (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-07b85e4b2f61b1526bf202c64460abcd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">Table Store 0.3 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
<li>
<label for="section-9b9a0032b1e858a34c125d828d1a0718" class="flex justify-between flink-menu-item flink-menu-child">
<a href="https://nightlies.apache.org/flink/flink-table-store-docs-master/" style="color:black" class="">Table Store Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
</li>
</ul>
<label for="section-63d6a565d79aa2895f70806a46021c07" class="flex justify-between flink-menu-item">
<a href="/getting-help/" class="">Getting Help</a>
</label>
<label for="section-1d5066022b83f4732dc80f4e9eaa069a" class="flex justify-between flink-menu-item">
<a href="https://flink-packages.org/" style="color:black" class="">flink-packages.org <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<br/>
<label for="section-7821b78a97db9e919426e86121a7be9c" class="flex justify-between flink-menu-item">
<a href="/community/" class="">Community & Project Info</a>
</label>
<label for="section-8c042831df4e371c4ef9375f1df06f35" class="flex justify-between flink-menu-item">
<a href="/roadmap/" class="">Roadmap</a>
</label>
<input type="checkbox" id="section-73117efde5302fddcb193307d582b588" class="toggle" />
<label for="section-73117efde5302fddcb193307d582b588" class="flex justify-between flink-menu-item">How to Contribute<span></span>
</label>
<ul>
<li>
<label for="section-6646b26b23a3e79b8de9c552ee76f6dd" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/overview/" class="">Overview</a>
</label>
</li>
<li>
<label for="section-e6ab9538b82cd5f94103b971adb7c1a9" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-code/" class="">Contribute Code</a>
</label>
</li>
<li>
<label for="section-1c09e1358485e82d9b3f5f689d4ced65" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/reviewing-prs/" class="">Review Pull Requests</a>
</label>
</li>
<li>
<label for="section-ed01e0defd235498fa3c9a2a0b3302fb" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/code-style-and-quality-preamble/" class="">Code Style and Quality Guide</a>
</label>
</li>
<li>
<label for="section-4e8d5e9924cf15f397711b0d82e15650" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/contribute-documentation/" class="">Contribute Documentation</a>
</label>
</li>
<li>
<label for="section-ddaa8307917e5ba7f60ba3316711e492" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/documentation-style-guide/" class="">Documentation Style Guide</a>
</label>
</li>
<li>
<label for="section-390a72c171cc82f180a308b95fc3aa72" class="flex justify-between flink-menu-item flink-menu-child">
<a href="/how-to-contribute/improve-website/" class="">Contribute to the Website</a>
</label>
</li>
</ul>
<label for="section-9d3ddfd487223d5a199ba301f25c88c6" class="flex justify-between flink-menu-item">
<a href="/security/" class="">Security</a>
</label>
<br/>
<label for="section-a07783f405300745807d39eacf150420" class="flex justify-between flink-menu-item">
<a href="/posts/" class="">Flink Blog</a>
</label>
<br/>
<hr class="menu-break">
<label for="section-f71a7070dbb7b669824a6441408ded70" class="flex justify-between flink-menu-item">
<a href="https://github.com/apache/flink" style="color:black" class="">Flink on GitHub <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<label for="section-2ccaaab8c67f3105bbf7df75faca8027" class="flex justify-between flink-menu-item">
<a href="https://twitter.com/apacheflink" style="color:black" class="">@ApacheFlink <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label>
<hr class="menu-break">
<table>
<tr>
<th colspan="2">
<label for="section-78c2028200542d78f8c1a8f6b4cbb36b" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/" style="color:black" class="">Apache Software Foundation <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></th>
</tr>
<tr>
<td>
<label for="section-794df3791a8c800841516007427a2aa3" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/licenses/" style="color:black" class="">License <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-2fae32629d4ef4fc6341f1751b405e45" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/security/" style="color:black" class="">Security <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
<tr>
<td>
<label for="section-0584e445d656b83b431227bb80ff0c30" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/sponsorship.html" style="color:black" class="">Donate <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
<td>
<label for="section-00d06796e489999226fb5bb27fe1b3b2" class="flex justify-between flink-menu-item">
<a href="https://www.apache.org/foundation/thanks.html" style="color:black" class="">Thanks <i class="link fa fa-external-link title" aria-hidden="true"></i></a>
</label></td>
</tr>
</table>
<hr class="menu-break">
<a href="/zh/" class="flex align-center">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;&nbsp;
中文版
</a>
<script src="/js/track-search-terms.js"></script>
</nav>
<script>(function(){var a=document.querySelector("aside.book-menu nav");addEventListener("beforeunload",function(b){localStorage.setItem("menu.scrollTop",a.scrollTop)}),a.scrollTop=localStorage.getItem("menu.scrollTop")})()</script>
</aside>
<div class="book-page">
<header class="book-header">
<div class="flex align-center justify-between">
<label for="menu-control">
<img src="/svg/menu.svg" class="book-icon" alt="Menu" />
</label>
<strong>A Guide for Unit Testing in Apache Flink</strong>
<label for="toc-control">
<img src="/svg/toc.svg" class="book-icon" alt="Table of Contents" />
</label>
</div>
<aside class="hidden clearfix">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3></nav>
</aside>
</header>
<article class="markdown">
<h1>
<a href="/2020/02/03/a-guide-for-unit-testing-in-apache-flink/">A Guide for Unit Testing in Apache Flink</a>
</h1>
February 3, 2020 -
Kartik Khare
<a href="https://twitter.com/khare_khote">(@khare_khote)</a>
<p><p>Writing unit tests is one of the essential tasks of designing a production-grade application. Without tests, a single change in code can result in cascades of failure in production. Thus unit tests should be written for all types of applications, be it a simple job cleaning data and training a model or a complex multi-tenant, real-time data processing system. In the following sections, we provide a guide for unit testing of Apache Flink applications.
Apache Flink provides a robust unit testing framework to make sure your applications behave in production as expected during development. You need to include the following dependencies to utilize the provided framework.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-test-utils_${scala.binary.version}<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>${flink.version}<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;scope&gt;</span>test<span class="nt">&lt;/scope&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-runtime_2.11<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>1.9.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;scope&gt;</span>test<span class="nt">&lt;/scope&gt;</span>
<span class="nt">&lt;classifier&gt;</span>tests<span class="nt">&lt;/classifier&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
<span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.flink<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>flink-streaming-java_2.11<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>1.9.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;scope&gt;</span>test<span class="nt">&lt;/scope&gt;</span>
<span class="nt">&lt;classifier&gt;</span>tests<span class="nt">&lt;/classifier&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
</code></pre></div><p>The strategy of writing unit tests differs for various operators. You can break down the strategy into the following three buckets:</p>
<ul>
<li>Stateless Operators</li>
<li>Stateful Operators</li>
<li>Timed Process Operators</li>
</ul>
<h1 id="stateless-operators">
Stateless Operators
<a class="anchor" href="#stateless-operators">#</a>
</h1>
<p>Writing unit tests for a stateless operator is a breeze. You need to follow the basic norm of writing a test case, i.e., create an instance of the function class and test the appropriate methods. Let’s take an example of a simple <code>Map</code> operator.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatelessMap</span> <span class="kd">implements</span> <span class="n">MapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="n">String</span> <span class="nf">map</span><span class="o">(</span><span class="n">String</span> <span class="n">in</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">out</span> <span class="o">=</span> <span class="s">&#34;hello &#34;</span> <span class="o">+</span> <span class="n">in</span><span class="o">;</span>
<span class="k">return</span> <span class="n">out</span><span class="o">;</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div><p>The test case for the above operator should look like</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="nd">@Test</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">testMap</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">MyStatelessMap</span> <span class="n">statelessMap</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyStatelessMap</span><span class="o">();</span>
<span class="n">String</span> <span class="n">out</span> <span class="o">=</span> <span class="n">statelessMap</span><span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span><span class="s">&#34;hello world&#34;</span><span class="o">,</span> <span class="n">out</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div><p>Pretty simple, right? Let’s take a look at one for the <code>FlatMap</code> operator.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyStatelessFlatMap</span> <span class="kd">implements</span> <span class="n">FlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">out</span> <span class="o">=</span> <span class="s">&#34;hello &#34;</span> <span class="o">+</span> <span class="n">in</span><span class="o">;</span>
<span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">out</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div><p><code>FlatMap</code> operators require a <code>Collector</code> object along with the input. For the test case, we have two options:</p>
<ol>
<li>Mock the <code>Collector</code> object using Mockito</li>
<li>Use the <code>ListCollector</code> provided by Flink</li>
</ol>
<p>I prefer the second method as it requires fewer lines of code and is suitable for most of the cases.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="nd">@Test</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">testFlatMap</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">MyStatelessFlatMap</span> <span class="n">statelessFlatMap</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyStatelessFlatMap</span><span class="o">();</span>
<span class="n">List</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ArrayList</span><span class="o">&lt;&gt;();</span>
<span class="n">ListCollector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">listCollector</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ListCollector</span><span class="o">&lt;&gt;(</span><span class="n">out</span><span class="o">);</span>
<span class="n">statelessFlatMap</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">,</span> <span class="n">listCollector</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span><span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="s">&#34;hello world&#34;</span><span class="o">),</span> <span class="n">out</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div><h1 id="stateful-operators">
Stateful Operators
<a class="anchor" href="#stateful-operators">#</a>
</h1>
<p>Writing test cases for stateful operators requires more effort. You need to check whether the operator state is updated correctly and if it is cleaned up properly along with the output of the operator.</p>
<p>Let’s take an example of stateful <code>FlatMap</code> function</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">StatefulFlatMap</span> <span class="kd">extends</span> <span class="n">RichFlatMapFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="n">ValueState</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">previousInput</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">parameters</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">previousInput</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getState</span><span class="o">(</span>
<span class="k">new</span> <span class="n">ValueStateDescriptor</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&#34;previousInput&#34;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">));</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">flatMap</span><span class="o">(</span><span class="n">String</span> <span class="n">in</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">String</span> <span class="n">out</span> <span class="o">=</span> <span class="s">&#34;hello &#34;</span> <span class="o">+</span> <span class="n">in</span><span class="o">;</span>
<span class="k">if</span><span class="o">(</span><span class="n">previousInput</span><span class="o">.</span><span class="na">value</span><span class="o">()</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">){</span>
<span class="n">out</span> <span class="o">=</span> <span class="n">out</span> <span class="o">+</span> <span class="s">&#34; &#34;</span> <span class="o">+</span> <span class="n">previousInput</span><span class="o">.</span><span class="na">value</span><span class="o">();</span>
<span class="o">}</span>
<span class="n">previousInput</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="n">in</span><span class="o">);</span>
<span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">out</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div><p>The intricate part of writing tests for the above class is to mock the configuration as well as the runtime context of the application. Flink provides TestHarness classes so that users don’t have to create the mock objects themselves. Using the <code>KeyedOperatorHarness</code>, the test looks like:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="kn">import</span> <span class="nn">org.apache.flink.streaming.api.operators.StreamFlatMap</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.streaming.runtime.streamrecord.StreamRecord</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness</span><span class="o">;</span>
<span class="kn">import</span> <span class="nn">org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness</span><span class="o">;</span>
<span class="nd">@Test</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">testFlatMap</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">{</span>
<span class="n">StatefulFlatMap</span> <span class="n">statefulFlatMap</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StatefulFlatMap</span><span class="o">();</span>
<span class="c1">// OneInputStreamOperatorTestHarness takes the input and output types as type parameters
</span><span class="c1"></span> <span class="n">OneInputStreamOperatorTestHarness</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">testHarness</span> <span class="o">=</span>
<span class="c1">// KeyedOneInputStreamOperatorTestHarness takes three arguments:
</span><span class="c1"></span> <span class="c1">// Flink operator object, key selector and key type
</span><span class="c1"></span> <span class="k">new</span> <span class="n">KeyedOneInputStreamOperatorTestHarness</span><span class="o">&lt;&gt;(</span>
<span class="k">new</span> <span class="n">StreamFlatMap</span><span class="o">&lt;&gt;(</span><span class="n">statefulFlatMap</span><span class="o">),</span> <span class="n">x</span> <span class="o">-&gt;</span> <span class="s">&#34;1&#34;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">);</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">open</span><span class="o">();</span>
<span class="c1">// test first record
</span><span class="c1"></span> <span class="n">testHarness</span><span class="o">.</span><span class="na">processElement</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">);</span>
<span class="n">ValueState</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">previousInput</span> <span class="o">=</span>
<span class="n">statefulFlatMap</span><span class="o">.</span><span class="na">getRuntimeContext</span><span class="o">().</span><span class="na">getState</span><span class="o">(</span>
<span class="k">new</span> <span class="n">ValueStateDescriptor</span><span class="o">&lt;&gt;(</span><span class="s">&#34;previousInput&#34;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">));</span>
<span class="n">String</span> <span class="n">stateValue</span> <span class="o">=</span> <span class="n">previousInput</span><span class="o">.</span><span class="na">value</span><span class="o">();</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
<span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;hello world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">)),</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">extractOutputStreamRecords</span><span class="o">());</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">,</span> <span class="n">stateValue</span><span class="o">);</span>
<span class="c1">// test second record
</span><span class="c1"></span> <span class="n">testHarness</span><span class="o">.</span><span class="na">processElement</span><span class="o">(</span><span class="s">&#34;parallel&#34;</span><span class="o">,</span> <span class="n">20</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
<span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span>
<span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;hello world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">),</span>
<span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;hello parallel world&#34;</span><span class="o">,</span> <span class="n">20</span><span class="o">)),</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">extractOutputStreamRecords</span><span class="o">());</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span><span class="s">&#34;parallel&#34;</span><span class="o">,</span> <span class="n">previousInput</span><span class="o">.</span><span class="na">value</span><span class="o">());</span>
<span class="o">}</span>
</code></pre></div><p>The test harness provides many helper methods, three of which are being used here:</p>
<ol>
<li><code>open</code>: calls the open of the <code>FlatMap</code> function with relevant parameters. It also initializes the context.</li>
<li><code>processElement</code>: allows users to pass an input element as well as the timestamp associated with the element.</li>
<li><code>extractOutputStreamRecords</code>: gets the output records along with their timestamps from the <code>Collector</code>.</li>
</ol>
<p>The test harness simplifies the unit testing for the stateful functions to a large extent.</p>
<p>You might also need to check whether the state value is being set correctly. You can get the state value directly from the operator using a mechanism similar to the one used while creating the state. This is also demonstrated in the previous example.</p>
<h1 id="timed-process-operators">
Timed Process Operators
<a class="anchor" href="#timed-process-operators">#</a>
</h1>
<p>Writing tests for process functions, that work with time, is quite similar to writing tests for stateful functions because you can also use test harness.
However, you need to take care of another aspect, which is providing timestamps for events and controlling the current time of the application. By setting the current (processing or event) time, you can trigger registered timers, which will call the <code>onTimer</code> method of the function</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">MyProcessFunction</span> <span class="kd">extends</span> <span class="n">KeyedProcessFunction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="o">{</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span><span class="n">String</span> <span class="n">in</span><span class="o">,</span> <span class="n">Context</span> <span class="n">context</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">collector</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">context</span><span class="o">.</span><span class="na">timerService</span><span class="o">().</span><span class="na">registerProcessingTimeTimer</span><span class="o">(</span><span class="n">50</span><span class="o">);</span>
<span class="n">String</span> <span class="n">out</span> <span class="o">=</span> <span class="s">&#34;hello &#34;</span> <span class="o">+</span> <span class="n">in</span><span class="o">;</span>
<span class="n">collector</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">out</span><span class="o">);</span>
<span class="o">}</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">onTimer</span><span class="o">(</span><span class="kt">long</span> <span class="n">timestamp</span><span class="o">,</span> <span class="n">OnTimerContext</span> <span class="n">ctx</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">String</span><span class="o">.</span><span class="na">format</span><span class="o">(</span><span class="s">&#34;Timer triggered at timestamp %d&#34;</span><span class="o">,</span> <span class="n">timestamp</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
</code></pre></div><p>We need to test both the methods in the <code>KeyedProcessFunction</code>, i.e., <code>processElement</code> as well as <code>onTimer</code>. Using a test harness, we can control the current time of the function. Thus, we can trigger the timer at will rather than waiting for a specific time.</p>
<p>Let’s take a look at the test case</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="nd">@Test</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">testProcessElement</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span><span class="o">{</span>
<span class="n">MyProcessFunction</span> <span class="n">myProcessFunction</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyProcessFunction</span><span class="o">();</span>
<span class="n">OneInputStreamOperatorTestHarness</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">testHarness</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">KeyedOneInputStreamOperatorTestHarness</span><span class="o">&lt;&gt;(</span>
<span class="k">new</span> <span class="n">KeyedProcessOperator</span><span class="o">&lt;&gt;(</span><span class="n">myProcessFunction</span><span class="o">),</span> <span class="n">x</span> <span class="o">-&gt;</span> <span class="s">&#34;1&#34;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">);</span>
<span class="c1">// Function time is initialized to 0
</span><span class="c1"></span> <span class="n">testHarness</span><span class="o">.</span><span class="na">open</span><span class="o">();</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">processElement</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
<span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span><span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;hello world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">)),</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">extractOutputStreamRecords</span><span class="o">());</span>
<span class="o">}</span>
<span class="nd">@Test</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">testOnTimer</span><span class="o">()</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="n">MyProcessFunction</span> <span class="n">myProcessFunction</span> <span class="o">=</span> <span class="k">new</span> <span class="n">MyProcessFunction</span><span class="o">();</span>
<span class="n">OneInputStreamOperatorTestHarness</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">testHarness</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">KeyedOneInputStreamOperatorTestHarness</span><span class="o">&lt;&gt;(</span>
<span class="k">new</span> <span class="n">KeyedProcessOperator</span><span class="o">&lt;&gt;(</span><span class="n">myProcessFunction</span><span class="o">),</span> <span class="n">x</span> <span class="o">-&gt;</span> <span class="s">&#34;1&#34;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">);</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">open</span><span class="o">();</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">processElement</span><span class="o">(</span><span class="s">&#34;world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span><span class="n">1</span><span class="o">,</span> <span class="n">testHarness</span><span class="o">.</span><span class="na">numProcessingTimeTimers</span><span class="o">());</span>
<span class="c1">// Function time is set to 50
</span><span class="c1"></span> <span class="n">testHarness</span><span class="o">.</span><span class="na">setProcessingTime</span><span class="o">(</span><span class="n">50</span><span class="o">);</span>
<span class="n">Assert</span><span class="o">.</span><span class="na">assertEquals</span><span class="o">(</span>
<span class="n">Lists</span><span class="o">.</span><span class="na">newArrayList</span><span class="o">(</span>
<span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;hello world&#34;</span><span class="o">,</span> <span class="n">10</span><span class="o">),</span>
<span class="k">new</span> <span class="n">StreamRecord</span><span class="o">&lt;&gt;(</span><span class="s">&#34;Timer triggered at timestamp 50&#34;</span><span class="o">)),</span>
<span class="n">testHarness</span><span class="o">.</span><span class="na">extractOutputStreamRecords</span><span class="o">());</span>
<span class="o">}</span>
</code></pre></div><p>The mechanism to test the multi-input stream operators such as CoProcess functions is similar to the ones described in this article. You should use the TwoInput variant of the harness for these operators, such as <code>TwoInputStreamOperatorTestHarness</code>.</p>
<h1 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h1>
<p>In the previous sections we showcased how unit testing in Apache Flink works for stateless, stateful and times-aware-operators. We hope you found the steps easy to follow and execute while developing your Flink applications. If you have any questions or feedback you can reach out to me <a href="https://www.kharekartik.dev/about/">here</a> or contact the community on the <a href="https://flink.apache.org/community.html">Apache Flink user mailing list</a>.</p>
</p>
</article>
<footer class="book-footer">
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
<br><br>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-02-03-a-guide-for-unit-testing-in-apache-flink.md" style="color:black"><i class="fa fa-edit fa-fw"></i>Edit This Page</a>
</footer>
<div class="book-comments">
</div>
<label for="menu-control" class="hidden book-menu-overlay"></label>
</div>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3></nav>
</aside>
<aside class="expand-toc">
<button class="toc" onclick="expandToc()">
<i class="fa fa-expand" aria-hidden="true"></i>
</button>
</aside>
</main>
</body>
</html>