blob: 86c26b9228811dc25560126adb89732c7ca0f35c [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/07/30/advanced-flink-application-patterns-vol.3-custom-window-processing/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Introduction # In the previous articles of the series, we described how you can achieve flexible stream partitioning based on dynamically-updated configurations (a set of fraud-detection rules) and how you can utilize Flink&#39;s Broadcast mechanism to distribute processing configuration at runtime among the relevant operators. Following up directly where we left the discussion of the end-to-end solution last time, in this article we will describe how you can use the &quot;Swiss knife&quot; of Flink - the Process Function to create an implementation that is tailor-made to match your streaming business logic requirements.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Advanced Flink Application Patterns Vol.3: Custom Window Processing" />
<meta property="og:description" content="Introduction # In the previous articles of the series, we described how you can achieve flexible stream partitioning based on dynamically-updated configurations (a set of fraud-detection rules) and how you can utilize Flink&#39;s Broadcast mechanism to distribute processing configuration at runtime among the relevant operators. Following up directly where we left the discussion of the end-to-end solution last time, in this article we will describe how you can use the &quot;Swiss knife&quot; of Flink - the Process Function to create an implementation that is tailor-made to match your streaming business logic requirements." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/07/30/advanced-flink-application-patterns-vol.3-custom-window-processing/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-07-30T12:00:00+00:00" />
<meta property="article:modified_time" content="2020-07-30T12:00:00+00:00" />
<title>Advanced Flink Application Patterns Vol.3: Custom Window Processing | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/07/30/advanced-flink-application-patterns-vol.3-custom-window-processing/">Advanced Flink Application Patterns Vol.3: Custom Window Processing</a>
</h1>
July 30, 2020 -
Alexander Fedulov
<a href="https://twitter.com/alex_fedulov">(@alex_fedulov)</a>
<p><style type="text/css">
.tg {border-collapse:collapse;border-spacing:0;}
.tg td{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;}
.tg th{padding:10px 10px;border-style:solid;border-width:1px;overflow:hidden;word-break:normal;background-color:#eff0f1;}
.tg .tg-wide{padding:10px 30px;}
.tg .tg-top{vertical-align:top}
.tg .tg-topcenter{text-align:center;vertical-align:top}
.tg .tg-center{text-align:center;vertical-align:center}
</style>
<h2 id="introduction">
Introduction
<a class="anchor" href="#introduction">#</a>
</h2>
<p>In the previous articles of the series, we described how you can achieve
flexible stream partitioning based on dynamically-updated configurations
(a set of fraud-detection rules) and how you can utilize Flink's
Broadcast mechanism to distribute processing configuration at runtime
among the relevant operators. </p>
<p>Following up directly where we left the discussion of the end-to-end
solution last time, in this article we will describe how you can use the
&quot;Swiss knife&quot; of Flink - the <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html"><em>Process Function</em></a> to create an
implementation that is tailor-made to match your streaming business
logic requirements. Our discussion will continue in the context of the
<a href="/news/2020/01/15/demo-fraud-detection.html#fraud-detection-demo">Fraud Detection engine</a>. We will also demonstrate how you can
implement your own <strong>custom replacement for time windows</strong> for cases
where the out-of-the-box windowing available from the DataStream API
does not satisfy your requirements. In particular, we will look at the
trade-offs that you can make when designing a solution which requires
low-latency reactions to individual events.</p>
<p>This article will describe some high-level concepts that can be applied
independently, but it is recommended that you review the material in
<a href="/news/2020/01/15/demo-fraud-detection.html">part one</a> and
<a href="/news/2020/03/24/demo-fraud-detection-2.html">part two</a> of the series as well as checkout the <a href="https://github.com/afedulov/fraud-detection-demo">code
base</a> in order to make
it easier to follow along.</p>
<h2 id="processfunction-as-a-window">
ProcessFunction as a &ldquo;Window&rdquo;
<a class="anchor" href="#processfunction-as-a-window">#</a>
</h2>
<h3 id="low-latency">
Low Latency
<a class="anchor" href="#low-latency">#</a>
</h3>
<p>Let&rsquo;s start with a reminder of the type of fraud detection rule that we
would like to support:</p>
<p><em>&ldquo;Whenever the <strong>sum</strong> of  <strong>payments</strong> from the same <strong>payer</strong> to the
same <strong>beneficiary</strong> within <strong>a 24 hour
period</strong> is <strong>greater</strong> than <strong>200 000 $</strong> - trigger an alert.&rdquo;</em></p>
<p>In other words, given a stream of transactions partitioned by a key that
combines the payer and the beneficiary fields, we would like to look
back in time and determine, for each incoming transaction, if the sum of
all previous payments between the two specific participants exceeds the
defined threshold. In effect, the computation window is always moved
along to the position of the last observed event for a particular data
partitioning key.</p>
<center>
<img src="/img/blog/patterns-blog-3/time-windows.png" width="600px" alt="Figure 1: Time Windows"/>
<br/>
<i><small>Figure 1: Time Windows</small></i>
</center>
<br/>
<p>One of the common key requirements for a fraud detection system is <em>low
response time</em>. The sooner the fraudulent action gets detected, the
higher the chances that it can be blocked and its negative consequences
mitigated. This requirement is especially prominent in the financial
domain, where you have one important constraint - any time spent
evaluating a fraud detection model is time that a law-abiding user of
your system will spend waiting for a response. Swiftness of processing
often becomes a competitive advantage between various payment systems
and the time limit for producing an alert could lie as low as <em>300-500
ms</em>. This is all the time you get from the moment of ingestion of a
transaction event into a fraud detection system until an alert has to
become available to downstream systems. </p>
<p>As you might know, Flink provides a powerful <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/windows.html">Window
API</a>
that is applicable for a wide range of use cases. However, if you go
over all of the available types of supported windows, you will realize
that none of them exactly match our main requirement for this use case -
the low-latency evaluation of <em>each</em> incoming transaction. There is
no type of window in Flink that can express the <em>&ldquo;x minutes/hours/days
back from the <u>current event</u>&rdquo;</em> semantic. In the Window API, events
fall into windows (as defined by the window
<a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#window-assigners">assigners</a>),
but they cannot themselves individually control the creation and
evaluation of windows*. As described above, our goal for the fraud
detection engine is to achieve immediate evaluation of the previous
relevant data points as soon as the new event is received. This raises
the question of feasibility of applying the Window API in this case. The Window API offers some options for defining custom triggers, evictors, and window assigners, which may get to the required result. However, it is usually difficult to get this right (and easy to break). Moreover, this approach does not provide access to broadcast state, which is required for implementing dynamic reconfiguration of business rules.</p>
<p>*) apart from the session windows, but they are limited to assignments
based on the session <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#session-windows">gaps</a></p>
<center>
<img src="/img/blog/patterns-blog-3/evaluation-delays.png" width="600px" alt="Figure 2: Evaluation Delays"/>
<br/>
<i><small>Figure 2: Evaluation Delays</small></i>
</center>
<br/>
<p>Let&rsquo;s take an example of using a <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/windows.html#sliding-windows">sliding
window</a>
from Flink&rsquo;s Window API. Using sliding windows with the slide of <em>S</em>
translates into an expected value of evaluation delay equal to <em>S/2.</em>
This means that you would need to define a window slide of 600-1000 ms
to fulfill the low-latency requirement of 300-500 ms delay, even before
taking any actual computation time into account. The fact that Flink
stores a separate window state for each sliding window pane renders this
approach unfeasible under any moderately high load conditions.</p>
<p>In order to satisfy the requirements, we need to create our own
low-latency window implementation. Luckily, Flink gives us all the tools
required to do so. <code>ProcessFunction</code> is a low-level, but powerful
building block in Flink's API. It has a simple contract:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">SomeProcessFunction</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">KeyedProcessFunction</span><span class="o">&lt;</span><span class="n">KeyType</span><span class="p">,</span><span class="w"> </span><span class="n">InputType</span><span class="p">,</span><span class="w"> </span><span class="n">OutputType</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">processElement</span><span class="p">(</span><span class="n">InputType</span><span class="w"> </span><span class="n">event</span><span class="p">,</span><span class="w"> </span><span class="n">Context</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">OutputType</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">){}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">onTimer</span><span class="p">(</span><span class="kt">long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="n">OnTimerContext</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">OutputType</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="p">{}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">open</span><span class="p">(</span><span class="n">Configuration</span><span class="w"> </span><span class="n">parameters</span><span class="p">){}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><ul>
<li>
<p><code>processElement()</code> receives input events one by one. You can react to
each input by producing one or more output events to the next
operator by calling <code>out.collect(someOutput)</code>. You can also pass data
to a <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/side_output.html">side
output</a>
or ignore a particular input altogether.</p>
</li>
<li>
<p><code>onTimer()</code> is called by Flink when a previously-registered timer
fires. Both event time and processing time timers are supported.</p>
</li>
<li>
<p><code>open()</code> is equivalent to a constructor. It is called inside of the
<a href="//nightlies.apache.org/flink/flink-docs-release-1.11/concepts/glossary.html#flink-taskmanager">TaskManager&rsquo;s</a>
JVM, and is used for initialization, such as registering
Flink-managed state. It is also the right place to initialize fields
that are not serializable and cannot be transferred from the
JobManager&rsquo;s JVM.</p>
</li>
</ul>
<p>Most importantly, <code>ProcessFunction</code> also has access to the fault-tolerant
state, handled by Flink. This combination, together with Flink's
message processing and delivery guarantees, makes it possible to build
resilient event-driven applications with almost arbitrarily
sophisticated business logic. This includes creation and processing of
custom windows with state.</p>
<h3 id="implementation">
Implementation
<a class="anchor" href="#implementation">#</a>
</h3>
<h4 id="state-and-clean-up">
State and Clean-up
<a class="anchor" href="#state-and-clean-up">#</a>
</h4>
<p>In order to be able to process time windows, we need to keep track of
data belonging to the window inside of our program. To ensure that this
data is fault-tolerant and can survive failures in a distributed system,
we should store it inside of Flink-managed state. As the time
progresses, we do not need to keep all previous transactions. According
to the sample rule, all events that are older than 24 hours become
irrelevant. We are looking at a window of data that constantly moves and
where stale transactions need to be constantly moved out of scope (in
other words, cleaned up from state).</p>
<center>
<img src="/img/blog/patterns-blog-3/window-clean-up.png" width="400px" alt="Figure 3: Window Clean-up"/>
<br/>
<i><small>Figure 3: Window Clean-up</small></i>
</center>
<br/>
<p>We will
<a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/state/state.html#using-keyed-state">use</a>
<code>MapState</code> to store the individual events of the window. In order to allow
efficient clean-up of the out-of-scope events, we will utilize event
timestamps as the <code>MapState</code> keys.</p>
<p>In a general case, we have to take into account the fact that there
might be different events with exactly the same timestamp, therefore
instead of individual Transaction per key(timestamp) we will store sets.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">MapState</span><span class="o">&lt;</span><span class="n">Long</span><span class="p">,</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">windowState</span><span class="p">;</span><span class="w">
</span></span></span></code></pre></div><div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Side Note </span>
when any Flink-managed state is used inside a
`KeyedProcessFunction`, the data returned by the `state.value()` call is
automatically scoped by the key of the *currently-processed event*
- see Figure 4. If `MapState` is used, the same principle applies, with
the difference that a `Map` is returned instead of `MyObject`. If you are
compelled to do something like
`mapState.value().get(inputEvent.getKey())`, you should probably be using
`ValueState` instead of the `MapState`. As we want to store *multiple values
per event key*, in our case, `MapState` is the right choice.
<br/>
<center>
<img src="/img/blog/patterns-blog-3/keyed-state-scoping.png" width="800px" alt="Figure 4: Keyed State Scoping"/>
<br/>
<i><small>Figure 4: Keyed State Scoping</small></i>
</center>
</div>
<p>As described in the <a href="/news/2020/01/15/demo-fraud-detection.html">first blog of the series</a>, we are dispatching events based on the keys
specified in the active fraud detection rules. Multiple distinct rules
can be based on the same grouping key. This means that our alerting
function can potentially receive transactions scoped by the same key
(e.g. <code>{payerId=25;beneficiaryId=12}</code>), but destined to be evaluated
according to different rules, which implies potentially different
lengths of the time windows. This raises the question of how can we best
store fault-tolerant window state within the <code>KeyedProcessFunction</code>. One
approach would be to create and manage separate <code>MapStates</code> per rule. Such
an approach, however, would be wasteful - we would separately hold state
for overlapping time windows, and therefore unnecessarily store
duplicate events. A better approach is to always store just enough data
to be able to estimate all currently active rules which are scoped by
the same key. In order to achieve that, whenever a new rule is added, we
will determine if its time window has the largest span and store it in
the broadcast state under the special reserved <code>WIDEST_RULE_KEY</code>. This
information will later be used during the state clean-up procedure, as
described later in this section.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">processBroadcastElement</span><span class="p">(</span><span class="n">Rule</span><span class="w"> </span><span class="n">rule</span><span class="p">,</span><span class="w"> </span><span class="n">Context</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">){</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">updateWidestWindowRule</span><span class="p">(</span><span class="n">rule</span><span class="p">,</span><span class="w"> </span><span class="n">broadcastState</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">updateWidestWindowRule</span><span class="p">(</span><span class="n">Rule</span><span class="w"> </span><span class="n">rule</span><span class="p">,</span><span class="w"> </span><span class="n">BroadcastState</span><span class="o">&lt;</span><span class="n">Integer</span><span class="p">,</span><span class="w"> </span><span class="n">Rule</span><span class="o">&gt;</span><span class="w"> </span><span class="n">broadcastState</span><span class="p">){</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Rule</span><span class="w"> </span><span class="n">widestWindowRule</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">broadcastState</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">WIDEST_RULE_KEY</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">widestWindowRule</span><span class="w"> </span><span class="o">==</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">broadcastState</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">WIDEST_RULE_KEY</span><span class="p">,</span><span class="w"> </span><span class="n">rule</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">widestWindowRule</span><span class="p">.</span><span class="na">getWindowMillis</span><span class="p">()</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">rule</span><span class="p">.</span><span class="na">getWindowMillis</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">broadcastState</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">WIDEST_RULE_KEY</span><span class="p">,</span><span class="w"> </span><span class="n">rule</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Let&rsquo;s now look at the implementation of the main method,
<code>processElement()</code>, in some detail.</p>
<p>In the <a href="/news/2020/01/15/demo-fraud-detection.html#dynamic-data-partitioning">previous blog post</a>, we described how <code>DynamicKeyFunction</code> allowed
us to perform dynamic data partitioning based on the <code>groupingKeyNames</code>
parameter in the rule definition. The subsequent description is focused
around the <code>DynamicAlertFunction</code>, which makes use of the remaining rule
settings.</p>
<center>
<img src="/img/blog/patterns-blog-3/sample-rule-definition.png" width="700px" alt="Figure 5: Sample Rule Definition"/>
<br/>
<i><small>Figure 5: Sample Rule Definition</small></i>
</center>
<br/>
<p>As described in the previous parts of the blog post
series, our alerting process function receives events of type
<code>Keyed&lt;Transaction, String, Integer&gt;</code>, where <code>Transaction</code> is the main
&ldquo;wrapped&rdquo; event, String is the key (<em>payer #x - beneficiary #y</em> in
Figure 1), and <code>Integer</code> is the ID of the rule that caused the dispatch of
this event. This rule was previously <a href="/news/2020/03/24/demo-fraud-detection-2.html#broadcast-state-pattern">stored in the broadcast state</a> and has to be retrieved from that state by the ID. Here is the
outline of the implementation:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DynamicAlertFunction</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">KeyedBroadcastProcessFunction</span><span class="o">&lt;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;</span><span class="p">,</span><span class="w"> </span><span class="n">Rule</span><span class="p">,</span><span class="w"> </span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kd">transient</span><span class="w"> </span><span class="n">MapState</span><span class="o">&lt;</span><span class="n">Long</span><span class="p">,</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">windowState</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">processElement</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">value</span><span class="p">,</span><span class="w"> </span><span class="n">ReadOnlyContext</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">){</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Add Transaction to state</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">currentEventTime</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">getWrapped</span><span class="p">().</span><span class="na">getEventTime</span><span class="p">();</span><span class="w"> </span><span class="c1">// &lt;--- (1)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">addToStateValuesSet</span><span class="p">(</span><span class="n">windowState</span><span class="p">,</span><span class="w"> </span><span class="n">currentEventTime</span><span class="p">,</span><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">getWrapped</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Calculate the aggregate value</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Rule</span><span class="w"> </span><span class="n">rule</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">getBroadcastState</span><span class="p">(</span><span class="n">Descriptors</span><span class="p">.</span><span class="na">rulesDescriptor</span><span class="p">).</span><span class="na">get</span><span class="p">(</span><span class="n">value</span><span class="p">.</span><span class="na">getId</span><span class="p">());</span><span class="w"> </span><span class="c1">// &lt;--- (2)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">windowStartTimestampForEvent</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">rule</span><span class="p">.</span><span class="na">getWindowStartTimestampFor</span><span class="p">(</span><span class="n">currentEventTime</span><span class="p">);</span><span class="c1">// &lt;--- (3)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">SimpleAccumulator</span><span class="o">&lt;</span><span class="n">BigDecimal</span><span class="o">&gt;</span><span class="w"> </span><span class="n">aggregator</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">RuleHelper</span><span class="p">.</span><span class="na">getAggregator</span><span class="p">(</span><span class="n">rule</span><span class="p">);</span><span class="w"> </span><span class="c1">// &lt;--- (4)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Long</span><span class="w"> </span><span class="n">stateEventTime</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">windowState</span><span class="p">.</span><span class="na">keys</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">isStateValueInWindow</span><span class="p">(</span><span class="n">stateEventTime</span><span class="p">,</span><span class="w"> </span><span class="n">windowStartForEvent</span><span class="p">,</span><span class="w"> </span><span class="n">currentEventTime</span><span class="p">))</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">aggregateValuesInState</span><span class="p">(</span><span class="n">stateEventTime</span><span class="p">,</span><span class="w"> </span><span class="n">aggregator</span><span class="p">,</span><span class="w"> </span><span class="n">rule</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Evaluate the rule and trigger an alert if violated</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">BigDecimal</span><span class="w"> </span><span class="n">aggregateResult</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">aggregator</span><span class="p">.</span><span class="na">getLocalValue</span><span class="p">();</span><span class="w"> </span><span class="c1">// &lt;--- (5)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="n">isRuleViolated</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">rule</span><span class="p">.</span><span class="na">apply</span><span class="p">(</span><span class="n">aggregateResult</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">isRuleViolated</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">decisionTime</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">System</span><span class="p">.</span><span class="na">currentTimeMillis</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">Alert</span><span class="o">&lt;&gt;</span><span class="p">(</span><span class="n">rule</span><span class="p">.</span><span class="na">getRuleId</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rule</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">getKey</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">decisionTime</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">value</span><span class="p">.</span><span class="na">getWrapped</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">aggregateResult</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Register timers to ensure state cleanup</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">cleanupTime</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">(</span><span class="n">currentEventTime</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">1000</span><span class="p">)</span><span class="w"> </span><span class="o">*</span><span class="w"> </span><span class="n">1000</span><span class="p">;</span><span class="w"> </span><span class="c1">// &lt;--- (6)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">timerService</span><span class="p">().</span><span class="na">registerEventTimeTimer</span><span class="p">(</span><span class="n">cleanupTime</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><br/>
Here are the details of the steps:
1) We first add each new event to our window state:
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">static</span><span class="w"> </span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="nf">addToStateValuesSet</span><span class="p">(</span><span class="n">MapState</span><span class="o">&lt;</span><span class="n">K</span><span class="p">,</span><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">mapState</span><span class="p">,</span><span class="w"> </span><span class="n">K</span><span class="w"> </span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">V</span><span class="w"> </span><span class="n">value</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">V</span><span class="o">&gt;</span><span class="w"> </span><span class="n">valuesSet</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">mapState</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">key</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">valuesSet</span><span class="w"> </span><span class="o">!=</span><span class="w"> </span><span class="kc">null</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">valuesSet</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">value</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">else</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">valuesSet</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">HashSet</span><span class="o">&lt;&gt;</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">valuesSet</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">value</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">mapState</span><span class="p">.</span><span class="na">put</span><span class="p">(</span><span class="n">key</span><span class="p">,</span><span class="w"> </span><span class="n">valuesSet</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">valuesSet</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><ol start="2">
<li>
<p>Next, we retrieve the previously-broadcasted rule, according to
which the incoming transaction needs to be evaluated.</p>
</li>
<li>
<p><code>getWindowStartTimestampFor</code> determines, given the window span defined
in the rule, and the current transaction timestamp, how far back in
time our evaluation should span.</p>
</li>
<li>
<p>The aggregate value is calculated by iterating over all window state
entries and applying an aggregate function. It could be an <em>average,
max, min</em> or, as in the example rule from the beginning of this
section, a <em>sum</em>.</p>
</li>
</ol>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">private</span><span class="w"> </span><span class="kt">boolean</span><span class="w"> </span><span class="nf">isStateValueInWindow</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">stateEventTime</span><span class="p">,</span><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">windowStartForEvent</span><span class="p">,</span><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">currentEventTime</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">stateEventTime</span><span class="w"> </span><span class="o">&gt;=</span><span class="w"> </span><span class="n">windowStartForEvent</span><span class="w"> </span><span class="o">&amp;&amp;</span><span class="w"> </span><span class="n">stateEventTime</span><span class="w"> </span><span class="o">&lt;=</span><span class="w"> </span><span class="n">currentEventTime</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">aggregateValuesInState</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">stateEventTime</span><span class="p">,</span><span class="w"> </span><span class="n">SimpleAccumulator</span><span class="o">&lt;</span><span class="n">BigDecimal</span><span class="o">&gt;</span><span class="w"> </span><span class="n">aggregator</span><span class="p">,</span><span class="w"> </span><span class="n">Rule</span><span class="w"> </span><span class="n">rule</span><span class="p">)</span><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Set</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;</span><span class="w"> </span><span class="n">inWindow</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">windowState</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="n">stateEventTime</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Transaction</span><span class="w"> </span><span class="n">event</span><span class="w"> </span><span class="p">:</span><span class="w"> </span><span class="n">inWindow</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">BigDecimal</span><span class="w"> </span><span class="n">aggregatedValue</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">FieldsExtractor</span><span class="p">.</span><span class="na">getBigDecimalByName</span><span class="p">(</span><span class="n">rule</span><span class="p">.</span><span class="na">getAggregateFieldName</span><span class="p">(),</span><span class="w"> </span><span class="n">event</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">aggregator</span><span class="p">.</span><span class="na">add</span><span class="p">(</span><span class="n">aggregatedValue</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><ol start="5">
<li>
<p>Having an aggregate value, we can compare it to the threshold value
that is specified in the rule definition and fire an alert, if
necessary.</p>
</li>
<li>
<p>At the end, we register a clean-up timer using
<code>ctx.timerService().registerEventTimeTimer()</code>. This timer will be
responsible for removing the current transaction when it is going to
move out of scope.</p>
</li>
</ol>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note </span>
Notice the rounding during timer creation. It is an important technique
which enables a reasonable trade-off between the precision with which
the timers will be triggered, and the number of timers being used.
Timers are stored in Flink's fault-tolerant state, and managing them
with millisecond-level precision can be wasteful. In our case, with this
rounding, we will create at most one timer per key in any given second. Flink documentation provides some additional [<u>details</u>](//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/operators/process_function.html#timer-coalescing).
</div>
<ol start="7">
<li>The <code>onTimer</code> method will trigger the clean-up of the window state.</li>
</ol>
<p>As previously described, we are always keeping as many events in the
state as required for the evaluation of an active rule with the widest
window span. This means that during the clean-up, we only need to remove
the state which is out of scope of this widest window.</p>
<center>
<img src="/img/blog/patterns-blog-3/widest-window.png" width="800px" alt="Figure 6: Widest Window"/>
<br/>
<i><small>Figure 6: Widest Window</small></i>
</center>
<br/>
<p>This is how the clean-up procedure can be implemented:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">onTimer</span><span class="p">(</span><span class="kd">final</span><span class="w"> </span><span class="kt">long</span><span class="w"> </span><span class="n">timestamp</span><span class="p">,</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">OnTimerContext</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w"> </span><span class="kd">final</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">throws</span><span class="w"> </span><span class="n">Exception</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Rule</span><span class="w"> </span><span class="n">widestWindowRule</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">ctx</span><span class="p">.</span><span class="na">getBroadcastState</span><span class="p">(</span><span class="n">Descriptors</span><span class="p">.</span><span class="na">rulesDescriptor</span><span class="p">).</span><span class="na">get</span><span class="p">(</span><span class="n">WIDEST_RULE_KEY</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Optional</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="n">cleanupEventTimeWindow</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Optional</span><span class="p">.</span><span class="na">ofNullable</span><span class="p">(</span><span class="n">widestWindowRule</span><span class="p">).</span><span class="na">map</span><span class="p">(</span><span class="n">Rule</span><span class="p">::</span><span class="n">getWindowMillis</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Optional</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="n">cleanupEventTimeThreshold</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">cleanupEventTimeWindow</span><span class="p">.</span><span class="na">map</span><span class="p">(</span><span class="n">window</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">timestamp</span><span class="w"> </span><span class="o">-</span><span class="w"> </span><span class="n">window</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Remove events that are older than (timestamp - widestWindowSpan)ms</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">cleanupEventTimeThreshold</span><span class="p">.</span><span class="na">ifPresent</span><span class="p">(</span><span class="k">this</span><span class="p">::</span><span class="n">evictOutOfScopeElementsFromWindow</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">private</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">evictOutOfScopeElementsFromWindow</span><span class="p">(</span><span class="n">Long</span><span class="w"> </span><span class="n">threshold</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">try</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Iterator</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span><span class="w"> </span><span class="n">keys</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">windowState</span><span class="p">.</span><span class="na">keys</span><span class="p">().</span><span class="na">iterator</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">while</span><span class="w"> </span><span class="p">(</span><span class="n">keys</span><span class="p">.</span><span class="na">hasNext</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Long</span><span class="w"> </span><span class="n">stateEventTime</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">keys</span><span class="p">.</span><span class="na">next</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">stateEventTime</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">threshold</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">keys</span><span class="p">.</span><span class="na">remove</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w"> </span><span class="k">catch</span><span class="w"> </span><span class="p">(</span><span class="n">Exception</span><span class="w"> </span><span class="n">ex</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">throw</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">RuntimeException</span><span class="p">(</span><span class="n">ex</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Note</span>
You might be wondering why we did not use `ListState` , as we are always
iterating over all of the values of the window state? This is actually
an optimization for the case when `RocksDBStateBackend`
[is used](//nightlies.apache.org/flink/flink-docs-release-1.11/ops/state/state_backends.html#the-rocksdbstatebackend). Iterating over a `ListState` would cause all of the `Transaction`
objects to be deserialized. Using `MapState`\'s keys iterator only causes
deserialization of the keys (type `long`), and therefore reduces the
computational overhead.
</div>
<p>This concludes the description of the implementation details. Our
approach triggers evaluation of a time window as soon as a new
transaction arrives. It therefore fulfills the main requirement that we
have targeted - low delay for potentially issuing an alert. For the
complete implementation, please have a look at
<a href="https://github.com/afedulov/fraud-detection-demo">the project on github</a>.</p>
<h2 id="improvements-and-optimizations">
Improvements and Optimizations
<a class="anchor" href="#improvements-and-optimizations">#</a>
</h2>
<p>What are the pros and cons of the described approach?</p>
<p><strong>Pros:</strong></p>
<ul>
<li>
<p>Low latency capabilities</p>
</li>
<li>
<p>Tailored solution with potential use-case specific optimizations</p>
</li>
<li>
<p>Efficient state reuse (shared state for the rules with the same key)</p>
</li>
</ul>
<p><strong>Cons:</strong></p>
<ul>
<li>
<p>Cannot make use of potential future optimizations in the existing
Window API</p>
</li>
<li>
<p>No late event handling, which is available out of the box in the
Window API</p>
</li>
<li>
<p>Quadratic computation complexity and potentially large state</p>
</li>
</ul>
<p>Let&rsquo;s now look at the latter two drawbacks and see if we can address
them.</p>
<h4 id="late-events">
Late events:
<a class="anchor" href="#late-events">#</a>
</h4>
<p>Processing late events poses a certain question - is it still meaningful
to re-evaluate the window in case of a late event arrival? In case this
is required, you would need to extend the widest window used for the
clean-up by your maximum expected out-of-orderness. This would avoid
having potentially incomplete time window data for such late firings
(see Figure 7).</p>
<center>
<img src="/img/blog/patterns-blog-3/late-events.png" width="500px" alt="Figure 7: Late Events Handling"/>
<br/>
<i><small>Figure 7: Late Events Handling</small></i>
</center>
<br/>
<p>It can be argued, however, that for a use case that puts emphasis on low
latency processing, such late triggering would be meaningless. In this
case, we could keep track of the most recent timestamp that we have
observed so far, and for events that do not monotonically increase this
value, only add them to the state and skip the aggregate calculation and
the alert triggering logic.</p>
<h4 id="redundant-re-computations-and-state-size">
Redundant Re-computations and State Size:
<a class="anchor" href="#redundant-re-computations-and-state-size">#</a>
</h4>
<p>In our described implementation we keep individual transactions in state
and go over them to calculate the aggregate again and again on every new
event. This is obviously not optimal in terms of wasting computational
resources on repeated calculations.</p>
<p>What is the main reason to keep the individual transactions in state?
The granularity of stored events directly corresponds to the precision
of the time window calculation. Because we store transactions
individually, we can precisely ignore individual transactions as soon as
they leave the exact 2592000000 ms time window (30 days in ms). At this
point, it is worth raising the question - do we really need this
milliseconds precision when estimating such a long time window, or is it
OK to accept potential false positives in exceptional cases? If the
answer for your use case is that such precision is not needed, you could
implement additional optimization based on bucketing and
pre-aggregation. The idea of this optimization can be broken down as
follows:</p>
<ul>
<li>
<p>Instead of storing individual events, create a parent class that can
either contain fields of a single transaction, or combined values,
calculated based on applying an aggregate function to a set of
transactions.</p>
</li>
<li>
<p>Instead of using timestamps in milliseconds as <code>MapState</code> keys, round
them to the level of &ldquo;resolution&rdquo; that you are willing to accept
(for instance, a full minute). Each entry therefore represents a
bucket.</p>
</li>
<li>
<p>Whenever a window is evaluated, append the new transaction&rsquo;s data to
the bucket aggregate instead of storing individual data points per
transaction.</p>
</li>
</ul>
<center>
<img src="/img/blog/patterns-blog-3/pre-aggregation.png" width="700px" alt="Figure 8: Pre-aggregation"/>
<br/>
<i><small>Figure 8: Pre-aggregation</small></i>
</center>
<br/>
<h4 id="state-data-and-serializers">
State Data and Serializers
<a class="anchor" href="#state-data-and-serializers">#</a>
</h4>
<p>Another question that we can ask ourselves in order to further optimize
the implementation is how probable is it to get different events with
exactly the same timestamp. In the described implementation, we
demonstrated one way of approaching this question by storing sets of
transactions per timestamp in <code>MapState&lt;Long, Set&lt;Transaction&gt;&gt;</code>. Such
a choice, however, might have a more significant effect on performance
than might be anticipated. The reason is that Flink does not currently
provide a native <code>Set</code> serializer and will enforce a fallback to the less
efficient <a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/types_serialization.html#general-class-types">Kryo
serializer</a>
instead
(<a href="https://issues.apache.org/jira/browse/FLINK-16729">FLINK-16729</a>). A
meaningful alternative strategy is to assume that, in a normal scenario,
no two discrepant events can have exactly the same timestamp and to turn
the window state into a <code>MapState&lt;Long, Transaction&gt;</code> type. You can use
<a href="//nightlies.apache.org/flink/flink-docs-release-1.11/dev/stream/side_output.html">side-outputs</a>
to collect and monitor any unexpected occurrences which contradict your
assumption. During performance optimizations, I generally recommend you
to <a href="https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#disabling-kryo">disable the fallback to
Kryo</a>
and verify where your application might be further optimized by ensuring
that <a href="https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#performance-comparison">more efficient
serializers</a>
are being used.</p>
<div class="alert alert-info" markdown="1">
<span class="label label-info" style="display: inline-block"><span class="glyphicon glyphicon-info-sign" aria-hidden="true"></span> Tip:</span>
You can quickly determine which serializer is going to be
used for your classes by setting a breakpoint and verifying the type of
the returned TypeInformation.
<br/>
<center>
<table class="tg">
<tr>
<td class="tg-topcenter">
<img src="/img/blog/patterns-blog-3/type-pojo.png" alt="POJO"/></td>
<td class="tg-topcenter">
<i>PojoTypeInfo</i> indicates that an efficient Flink POJO serializer will be used.</td>
</tr>
<tr>
<td class="tg-top">
<img src="/img/blog/patterns-blog-3/type-kryo.png" alt="Kryo"/></td>
<td class="tg-topcenter">
<i>GenericTypeInfo</i> indicates the fallback to a Kryo serializer.</td>
</tr>
</table>
</center>
</div>
<p><strong>Event pruning</strong>: Instead of storing complete events and putting
additional stress on the ser/de machinery, we can reduce individual
events data to only relevant information. This would potentially require
&ldquo;unpacking&rdquo; individual events as fields, and storing those fields into a
generic <code>Map&lt;String, Object&gt;</code> data structure, based on the
configurations of active rules.</p>
<p>While this adjustment could potentially produce significant improvements
for objects of large size, it should not be your first pick as it can
easily turn into a premature optimization.</p>
<h2 id="summary">
Summary:
<a class="anchor" href="#summary">#</a>
</h2>
<p>This article concludes the description of the implementation of the
fraud detection engine that we started in <a href="/news/2020/01/15/demo-fraud-detection.html">part one</a>. In this blog
post we demonstrated how <code>ProcessFunction</code> can be utilized to
&quot;impersonate&quot; a window with a sophisticated custom logic. We have
discussed the pros and cons of such approach and elaborated how custom
use-case-specific optimizations can be applied - something that would
not be directly possible with the Window API.</p>
<p>The goal of this blog post was to illustrate the power and flexibility
of Apache Flink&rsquo;s APIs. At the core of it are the pillars of Flink, that
spare you, as a developer, very significant amounts of work and
generalize well to a wide range of use cases by providing:</p>
<ul>
<li>
<p>Efficient data exchange in a distributed cluster</p>
</li>
<li>
<p>Horizontal scalability via data partitioning</p>
</li>
<li>
<p>Fault-tolerant state with quick, local access</p>
</li>
<li>
<p>Convenient abstraction for working with this state, which is as simple as using a
local variable</p>
</li>
<li>
<p>Multi-threaded, parallel execution engine. <code>ProcessFunction</code> code runs
in a single thread, without the need for synchronization. Flink
handles all the parallel execution aspects and correct access to the
shared state, without you, as a developer, having to think about it
(concurrency is hard).</p>
</li>
</ul>
<p>All these aspects make it possible to build applications with Flink that
go well beyond trivial streaming ETL use cases and enable implementation
of arbitrarily-sophisticated, distributed event-driven applications.
With Flink, you can rethink approaches to a wide range of use cases
which normally would rely on using stateless parallel execution nodes
and &ldquo;pushing&rdquo; the concerns of state fault tolerance to a database, an
approach that is often destined to run into scalability issues in the
face of ever-increasing data volumes.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-07-30-demo-fraud-detection-3.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#introduction">Introduction</a></li>
<li><a href="#processfunction-as-a-window">ProcessFunction as a &ldquo;Window&rdquo;</a>
<ul>
<li><a href="#low-latency">Low Latency</a></li>
<li><a href="#implementation">Implementation</a></li>
</ul>
</li>
<li><a href="#improvements-and-optimizations">Improvements and Optimizations</a>
<ul>
<li></li>
</ul>
</li>
<li><a href="#summary">Summary:</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>