blob: 8f7782788431755b1fcc2c1d41c97e3d51dcf202 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="In this series of blog posts you will learn about three powerful Flink patterns for building streaming applications:
Dynamic updates of application logic Dynamic data partitioning (shuffle), controlled at runtime Low latency alerting based on custom windowing logic (without using the window API) These patterns expand the possibilities of what is achievable with statically defined data flows and provide the building blocks to fulfill complex business requirements.
Dynamic updates of application logic allow Flink jobs to change at runtime, without downtime from stopping and resubmitting the code.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Advanced Flink Application Patterns Vol.1: Case Study of a Fraud Detection System" />
<meta property="og:description" content="In this series of blog posts you will learn about three powerful Flink patterns for building streaming applications:
Dynamic updates of application logic Dynamic data partitioning (shuffle), controlled at runtime Low latency alerting based on custom windowing logic (without using the window API) These patterns expand the possibilities of what is achievable with statically defined data flows and provide the building blocks to fulfill complex business requirements.
Dynamic updates of application logic allow Flink jobs to change at runtime, without downtime from stopping and resubmitting the code." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2020-01-15T12:00:00+00:00" />
<meta property="article:modified_time" content="2020-01-15T12:00:00+00:00" />
<title>Advanced Flink Application Patterns Vol.1: Case Study of a Fraud Detection System | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2020/01/15/advanced-flink-application-patterns-vol.1-case-study-of-a-fraud-detection-system/">Advanced Flink Application Patterns Vol.1: Case Study of a Fraud Detection System</a>
</h1>
January 15, 2020 -
Alexander Fedulov
<a href="https://twitter.com/alex_fedulov">(@alex_fedulov)</a>
<p><p>In this series of blog posts you will learn about three powerful Flink patterns for building streaming applications:</p>
<ul>
<li><a href="/news/2020/03/24/demo-fraud-detection-2.html">Dynamic updates of application logic</a></li>
<li>Dynamic data partitioning (shuffle), controlled at runtime</li>
<li><a href="/news/2020/07/30/demo-fraud-detection-3.html">Low latency alerting</a> based on custom windowing logic (without using the window API)</li>
</ul>
<p>These patterns expand the possibilities of what is achievable with statically defined data flows and provide the building blocks to fulfill complex business requirements.</p>
<p><strong>Dynamic updates of application logic</strong> allow Flink jobs to change at runtime, without downtime from stopping and resubmitting the code.<br>
<br>
<strong>Dynamic data partitioning</strong> provides the ability to change how events are distributed and grouped by Flink at runtime. Such functionality often becomes a natural requirement when building jobs with dynamically reconfigurable application logic.<br>
<br>
<strong>Custom window management</strong> demonstrates how you can utilize the low level <a href="//nightlies.apache.org/flink/flink-docs-stable/dev/stream/operators/process_function.html">process function API</a>, when the native <a href="//nightlies.apache.org/flink/flink-docs-stable/dev/stream/operators/windows.html">window API</a> is not exactly matching your requirements. Specifically, you will learn how to implement low latency alerting on windows and how to limit state growth with timers.</p>
<p>These patterns build on top of core Flink functionality, however, they might not be immediately apparent from the framework&rsquo;s documentation as explaining and presenting the motivation behind them is not always trivial without a concrete use case. That is why we will showcase these patterns with a practical example that offers a real-world usage scenario for Apache Flink — a <em>Fraud Detection</em> engine.
We hope that this series will place these powerful approaches into your tool belt and enable you to take on new and exciting tasks.</p>
<p>In the first blog post of the series we will look at the high-level architecture of the demo application, describe its components and their interactions. We will then deep dive into the implementation details of the first pattern in the series - <strong>dynamic data partitioning</strong>.</p>
<p>You will be able to run the full Fraud Detection Demo application locally and look into the details of the implementation by using the accompanying GitHub repository.</p>
<h3 id="fraud-detection-demo">
Fraud Detection Demo
<a class="anchor" href="#fraud-detection-demo">#</a>
</h3>
<p>The full source code for our fraud detection demo is open source and available online. To run it locally, check out the following repository and follow the steps in the README:</p>
<p><a href="https://github.com/afedulov/fraud-detection-demo">https://github.com/afedulov/fraud-detection-demo</a></p>
<p>You will see the demo is a self-contained application - it only requires <code>docker</code> and <code>docker-compose</code> to be built from sources and includes the following components:</p>
<ul>
<li>Apache Kafka (message broker) with ZooKeeper</li>
<li>Apache Flink (<a href="//nightlies.apache.org/flink/flink-docs-stable/concepts/glossary.html#flink-application-cluster">application cluster</a>)</li>
<li>Fraud Detection Web App</li>
</ul>
<p>The high-level goal of the Fraud Detection engine is to consume a stream of financial transactions and evaluate them against a set of rules. These rules are subject to frequent changes and tweaks. In a real production system, it is important to be able to add and remove them at runtime, without incurring an expensive penalty of stopping and restarting the job.</p>
<p>When you navigate to the demo URL in your browser, you will be presented with the following UI:</p>
<center>
<img src="/img/blog/2019-11-19-demo-fraud-detection/ui.png" width="800px" alt="Figure 1: Demo UI"/>
<br/>
<i><small>Figure 1: Fraud Detection Demo UI</small></i>
</center>
<br/>
<p>On the left side, you can see a visual representation of financial transactions flowing through the system after you click the &ldquo;Start&rdquo; button. The slider at the top allows you to control the number of generated transactions per second. The middle section is devoted to managing the rules evaluated by Flink. From here, you can create new rules as well as issue control commands, such as clearing Flink&rsquo;s state.</p>
<p>The demo out-of-the-box comes with a set of predefined sample rules. You can click the <em>Start</em> button and, after some time, will observe alerts displayed in the right section of the UI. These alerts are the result of Flink evaluating the generated transactions stream against the predefined rules.</p>
<p>Our sample fraud detection system consists of three main components:</p>
<ol>
<li>Frontend (React)</li>
<li>Backend (SpringBoot)</li>
<li>Fraud Detection application (Apache Flink)</li>
</ol>
<p>Interactions between the main elements are depicted in <em>Figure 2</em>.</p>
<center>
<img src="/img/blog/2019-11-19-demo-fraud-detection/architecture.png" width="800px" alt="Figure 2: Demo Components"/>
<br/>
<i><small>Figure 2: Fraud Detection Demo Components</small></i>
</center>
<br/>
<p>The Backend exposes a REST API to the Frontend for creating/deleting rules as well as issuing control commands for managing the demo execution. It then relays those Frontend actions to Flink by sending them via a &ldquo;Control&rdquo; Kafka topic. The Backend additionally includes a <em>Transactions Generator</em> component, which sends an emulated stream of money transfer events to Flink via a separate &ldquo;Transactions&rdquo; topic. Alerts generated by Flink are consumed by the Backend from &ldquo;Alerts&rdquo; topic and relayed to the UI via WebSockets.</p>
<p>Now that you are familiar with the overall layout and the goal of our Fraud Detection engine, let&rsquo;s now go into the details of what is required to implement such a system.</p>
<h3 id="dynamic-data-partitioning">
Dynamic Data Partitioning
<a class="anchor" href="#dynamic-data-partitioning">#</a>
</h3>
<p>The first pattern we will look into is Dynamic Data Partitioning.</p>
<p>If you have used Flink&rsquo;s DataStream API in the past, you are undoubtedly familiar with the <strong>keyBy</strong> method. Keying a stream shuffles all the records such that elements with the same key are assigned to the same partition. This means all records with the same key are processed by the same physical instance of the next operator.</p>
<p>In a typical streaming application, the choice of key is fixed, determined by some static field within the elements. For instance, when building a simple window-based aggregation of a stream of transactions, we might always group by the transactions account id.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="o">&gt;</span><span class="w"> </span><span class="n">input</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="c1">// [...]</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="p">...</span><span class="o">&gt;</span><span class="w"> </span><span class="n">windowed</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">input</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">keyBy</span><span class="p">(</span><span class="n">Transaction</span><span class="p">::</span><span class="n">getAccountId</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">window</span><span class="p">(</span><span class="cm">/*window specification*/</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>This approach is the main building block for achieving horizontal scalability in a wide range of use cases. However, in the case of an application striving to provide flexibility in business logic at runtime, this is not enough.
To understand why this is the case, let us start with articulating a realistic sample rule definition for our fraud detection system in the form of a functional requirement:</p>
<p><em>&ldquo;Whenever the <strong>sum</strong> of the accumulated <strong>payment amount</strong> from the same <strong>payer</strong> to the same <strong>beneficiary</strong> within the <strong>duration of a week</strong> is <strong>greater</strong> than <strong>1 000 000 $</strong> - fire an alert.&rdquo;</em></p>
<p>In this formulation we can spot a number of parameters that we would like to be able to specify in a newly-submitted rule and possibly even later modify or tweak at runtime:</p>
<ol>
<li>Aggregation field (payment amount)</li>
<li>Grouping fields (payer + beneficiary)</li>
<li>Aggregation function (sum)</li>
<li>Window duration (1 week)</li>
<li>Limit (1 000 000)</li>
<li>Limit operator (greater)</li>
</ol>
<p>Accordingly, we will use the following simple JSON format to define the aforementioned parameters:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-json" data-lang="json"><span class="line"><span class="cl"><span class="p">{</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;ruleId&#34;</span><span class="p">:</span> <span class="mi">1</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;ruleState&#34;</span><span class="p">:</span> <span class="s2">&#34;ACTIVE&#34;</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;groupingKeyNames&#34;</span><span class="p">:</span> <span class="p">[</span><span class="s2">&#34;payerId&#34;</span><span class="p">,</span> <span class="s2">&#34;beneficiaryId&#34;</span><span class="p">],</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;aggregateFieldName&#34;</span><span class="p">:</span> <span class="s2">&#34;paymentAmount&#34;</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;aggregatorFunctionType&#34;</span><span class="p">:</span> <span class="s2">&#34;SUM&#34;</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;limitOperatorType&#34;</span><span class="p">:</span> <span class="s2">&#34;GREATER&#34;</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;limit&#34;</span><span class="p">:</span> <span class="mi">1000000</span><span class="p">,</span>
</span></span><span class="line"><span class="cl"> <span class="nt">&#34;windowMinutes&#34;</span><span class="p">:</span> <span class="mi">10080</span>
</span></span><span class="line"><span class="cl"><span class="p">}</span>
</span></span></code></pre></div><p>At this point, it is important to understand that <strong><code>groupingKeyNames</code></strong> determine the actual physical grouping of events - all Transactions with the same values of specified parameters (e.g. <em>payer #25 -&gt; beneficiary #12</em>) have to be aggregated in the same physical instance of the evaluating operator. Naturally, the process of distributing data in such a way in Flink&rsquo;s API is realised by a <code>keyBy()</code> function.</p>
<p>Most examples in Flink&rsquo;s <code>keyBy()</code><a href="//nightlies.apache.org/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-field-expressions">documentation</a> use a hard-coded <code>KeySelector</code>, which extracts specific fixed events&rsquo; fields. However, to support the desired flexibility, we have to extract them in a more dynamic fashion based on the specifications of the rules. For this, we will have to use one additional operator that prepares every event for dispatching to a correct aggregating instance.</p>
<p>On a high level, our main processing pipeline looks like this:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">alerts</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">transactions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">process</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">DynamicKeyFunction</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">keyBy</span><span class="p">(</span><span class="cm">/* some key selector */</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">process</span><span class="p">(</span><span class="cm">/* actual calculations and alerting */</span><span class="p">)</span><span class="w">
</span></span></span></code></pre></div><p>We have previously established that each rule defines a <strong><code>groupingKeyNames</code></strong> parameter that specifies which combination of fields will be used for the incoming events&rsquo; grouping. Each rule might use an arbitrary combination of these fields. At the same time, every incoming event potentially needs to be evaluated against multiple rules. This implies that events might simultaneously need to be present at multiple parallel instances of evaluating operators that correspond to different rules and hence will need to be forked. Ensuring such events dispatching is the purpose of <code>DynamicKeyFunction()</code>.</p>
<center>
<img src="/img/blog/2019-11-19-demo-fraud-detection/shuffle_function_1.png" width="800px" alt="Figure 3: Forking events with Dynamic Key Function"/>
<br/>
<i><small>Figure 3: Forking events with Dynamic Key Function</small></i>
</center>
<br/>
<p><code>DynamicKeyFunction</code> iterates over a set of defined rules and prepares every event to be processed by a <code>keyBy()</code> function by extracting the required grouping keys:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">DynamicKeyFunction</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">ProcessFunction</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="p">,</span><span class="w"> </span><span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="cm">/* Simplified */</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">List</span><span class="o">&lt;</span><span class="n">Rule</span><span class="o">&gt;</span><span class="w"> </span><span class="n">rules</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="cm">/* Rules that are initialized somehow.
</span></span></span><span class="line"><span class="cl"><span class="cm"> Details will be discussed in a future blog post. */</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="nd">@Override</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="kt">void</span><span class="w"> </span><span class="nf">processElement</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Transaction</span><span class="w"> </span><span class="n">event</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Context</span><span class="w"> </span><span class="n">ctx</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">Keyed</span><span class="o">&lt;</span><span class="n">Transaction</span><span class="p">,</span><span class="w"> </span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">for</span><span class="w"> </span><span class="p">(</span><span class="n">Rule</span><span class="w"> </span><span class="n">rule</span><span class="w"> </span><span class="p">:</span><span class="n">rules</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Keyed</span><span class="o">&lt;&gt;</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">event</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">KeysExtractor</span><span class="p">.</span><span class="na">getKey</span><span class="p">(</span><span class="n">rule</span><span class="p">.</span><span class="na">getGroupingKeyNames</span><span class="p">(),</span><span class="w"> </span><span class="n">event</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">rule</span><span class="p">.</span><span class="na">getRuleId</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p><code>KeysExtractor.getKey()</code> uses reflection to extract the required values of <code>groupingKeyNames</code> fields from events and combines them as a single concatenated String key, e.g <code>&quot;{payerId=25;beneficiaryId=12}&quot;</code>. Flink will calculate the hash of this key and assign the processing of this particular combination to a specific server in the cluster. This will allow tracking all transactions between <em>payer #25</em> and <em>beneficiary #12</em> and evaluating defined rules within the desired time window.</p>
<p>Notice that a wrapper class <code>Keyed</code> with the following signature was introduced as the output type of <code>DynamicKeyFunction</code>:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">Keyed</span><span class="o">&lt;</span><span class="n">IN</span><span class="p">,</span><span class="w"> </span><span class="n">KEY</span><span class="p">,</span><span class="w"> </span><span class="n">ID</span><span class="o">&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">IN</span><span class="w"> </span><span class="n">wrapped</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">KEY</span><span class="w"> </span><span class="n">key</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="n">ID</span><span class="w"> </span><span class="n">id</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">public</span><span class="w"> </span><span class="n">KEY</span><span class="w"> </span><span class="nf">getKey</span><span class="p">(){</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="n">key</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Fields of this POJO carry the following information: <code>wrapped</code> is the original transaction event, <code>key</code> is the result of using <code>KeysExtractor</code> and <code>id</code> is the ID of the Rule that caused the dispatch of the event (according to the rule-specific grouping logic).</p>
<p>Events of this type will be the input to the <code>keyBy()</code> function in the main processing pipeline and allow the use of a simple lambda-expression as a <a href="//nightlies.apache.org/flink/flink-docs-stable/dev/api_concepts.html#define-keys-using-key-selector-functions"><code>KeySelector</code></a> for the final step of implementing dynamic data shuffle.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Alert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">alerts</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">transactions</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">process</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">DynamicKeyFunction</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">keyBy</span><span class="p">((</span><span class="n">keyed</span><span class="p">)</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">keyed</span><span class="p">.</span><span class="na">getKey</span><span class="p">());</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">process</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">DynamicAlertFunction</span><span class="p">())</span><span class="w">
</span></span></span></code></pre></div><p>By applying <code>DynamicKeyFunction</code> we are implicitly copying events for performing parallel per-rule evaluation within a Flink cluster. By doing so, we achieve an important property - horizontal scalability of rules&rsquo; processing. Our system will be capable of handling more rules by adding more servers to the cluster, i.e. increasing the parallelism. This property is achieved at the cost of data duplication, which might become an issue depending on the specific set of parameters, such as incoming data rate, available network bandwidth, event payload size etc. In a real-life scenario, additional optimizations can be applied, such as combined evaluation of rules which have the same <code>groupingKeyNames</code>, or a filtering layer, which would strip events of all the fields that are not required for processing of a particular rule.</p>
<h3 id="summary">
Summary:
<a class="anchor" href="#summary">#</a>
</h3>
<p>In this blog post, we have discussed the motivation behind supporting dynamic, runtime changes to a Flink application by looking at a sample use case - a Fraud Detection engine. We have described the overall architecture and interactions between its components as well as provided references for building and running a demo Fraud Detection application in a dockerized setup. We then showed the details of implementing a <strong>dynamic data partitioning pattern</strong> as the first underlying building block to enable flexible runtime configurations.</p>
<p>To remain focused on describing the core mechanics of the pattern, we kept the complexity of the DSL and the underlying rules engine to a minimum. Going forward, it is easy to imagine adding extensions such as allowing more sophisticated rule definitions, including filtering of certain events, logical rules chaining, and other more advanced functionality.</p>
<p>In the second part of this series, we will describe how the rules make their way into the running Fraud Detection engine. Additionally, we will go over the implementation details of the main processing function of the pipeline - <em>DynamicAlertFunction()</em>.</p>
<center>
<img src="/img/blog/2019-11-19-demo-fraud-detection/end-to-end.png" width="800px" alt="Figure 4: End-to-end pipeline"/>
<br/>
<i><small>Figure 4: End-to-end pipeline</small></i>
</center>
<br/>
<p>In the <a href="/news/2020/03/24/demo-fraud-detection-2.html">next article</a>, we will see how Flink&rsquo;s broadcast streams can be utilized to help steer the processing within the Fraud Detection engine at runtime (Dynamic Application Updates pattern).</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2020-01-15-demo-fraud-detection.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li>
<ul>
<li><a href="#fraud-detection-demo">Fraud Detection Demo</a></li>
<li><a href="#dynamic-data-partitioning">Dynamic Data Partitioning</a></li>
<li><a href="#summary">Summary:</a></li>
</ul>
</li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>