blob: a5867f565a635a286e2c34b76e66bf38384fae36 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2016/04/06/introducing-complex-event-processing-cep-with-apache-flink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="With the ubiquity of sensor networks and smart devices continuously collecting more and more data, we face the challenge to analyze an ever growing stream of data in near real-time. Being able to react quickly to changing trends or to deliver up to date business intelligence can be a decisive factor for a company’s success or failure. A key problem in real time processing is the detection of event patterns in data streams.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Introducing Complex Event Processing (CEP) with Apache Flink" />
<meta property="og:description" content="With the ubiquity of sensor networks and smart devices continuously collecting more and more data, we face the challenge to analyze an ever growing stream of data in near real-time. Being able to react quickly to changing trends or to deliver up to date business intelligence can be a decisive factor for a company’s success or failure. A key problem in real time processing is the detection of event patterns in data streams." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2016/04/06/introducing-complex-event-processing-cep-with-apache-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2016-04-06T10:00:00+00:00" />
<meta property="article:modified_time" content="2016-04-06T10:00:00+00:00" />
<title>Introducing Complex Event Processing (CEP) with Apache Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2016/04/06/introducing-complex-event-processing-cep-with-apache-flink/">Introducing Complex Event Processing (CEP) with Apache Flink</a>
</h1>
April 6, 2016 -
<p><p>With the ubiquity of sensor networks and smart devices continuously collecting more and more data, we face the challenge to analyze an ever growing stream of data in near real-time.
Being able to react quickly to changing trends or to deliver up to date business intelligence can be a decisive factor for a company’s success or failure.
A key problem in real time processing is the detection of event patterns in data streams.</p>
<p>Complex event processing (CEP) addresses exactly this problem of matching continuously incoming events against a pattern.
The result of a matching are usually complex events which are derived from the input events.
In contrast to traditional DBMSs where a query is executed on stored data, CEP executes data on a stored query.
All data which is not relevant for the query can be immediately discarded.
The advantages of this approach are obvious, given that CEP queries are applied on a potentially infinite stream of data.
Furthermore, inputs are processed immediately.
Once the system has seen all events for a matching sequence, results are emitted straight away.
This aspect effectively leads to CEP’s real time analytics capability.</p>
<p>Consequently, CEP’s processing paradigm drew significant interest and found application in a wide variety of use cases.
Most notably, CEP is used nowadays for financial applications such as stock market trend and credit card fraud detection.
Moreover, it is used in RFID-based tracking and monitoring, for example, to detect thefts in a warehouse where items are not properly checked out.
CEP can also be used to detect network intrusion by specifying patterns of suspicious user behaviour.</p>
<p>Apache Flink with its true streaming nature and its capabilities for low latency as well as high throughput stream processing is a natural fit for CEP workloads.
Consequently, the Flink community has introduced the first version of a new <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/libs/cep.html">CEP library</a> with <a href="http://flink.apache.org/news/2016/03/08/release-1.0.0.html">Flink 1.0</a>.
In the remainder of this blog post, we introduce Flink’s CEP library and we illustrate its ease of use through the example of monitoring a data center.</p>
<h2 id="monitoring-and-alert-generation-for-data-centers">
Monitoring and alert generation for data centers
<a class="anchor" href="#monitoring-and-alert-generation-for-data-centers">#</a>
</h2>
<center>
<img src="/img/blog/cep-monitoring.svg" style="width:600px;margin:15px">
</center>
<p>Assume we have a data center with a number of racks.
For each rack the power consumption and the temperature are monitored.
Whenever such a measurement takes place, a new power or temperature event is generated, respectively.
Based on this monitoring event stream, we want to detect racks that are about to overheat, and dynamically adapt their workload and cooling.</p>
<p>For this scenario we use a two staged approach.
First, we monitor the temperature events.
Whenever we see two consecutive events whose temperature exceeds a threshold value, we generate a temperature warning with the current average temperature.
A temperature warning does not necessarily indicate that a rack is about to overheat.
But whenever we see two consecutive warnings with increasing temperatures, then we want to issue an alert for this rack.
This alert can then lead to countermeasures to cool the rack.</p>
<h3 id="implementation-with-apache-flink">
Implementation with Apache Flink
<a class="anchor" href="#implementation-with-apache-flink">#</a>
</h3>
<p>First, we define the messages of the incoming monitoring event stream.
Every monitoring message contains its originating rack ID.
The temperature event additionally contains the current temperature and the power consumption event contains the current voltage.
We model the events as POJOs:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">abstract</span><span class="w"> </span><span class="kd">class</span> <span class="nc">MonitoringEvent</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">rackID</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">TemperatureEvent</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">MonitoringEvent</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">double</span><span class="w"> </span><span class="n">temperature</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">PowerEvent</span><span class="w"> </span><span class="kd">extends</span><span class="w"> </span><span class="n">MonitoringEvent</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">double</span><span class="w"> </span><span class="n">voltage</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>Now we can ingest the monitoring event stream using one of Flink’s connectors (e.g. Kafka, RabbitMQ, etc.).
This will give us a <code>DataStream&lt;MonitoringEvent&gt; inputEventStream</code> which we will use as the input for Flink’s CEP operator.
But first, we have to define the event pattern to detect temperature warnings.
The CEP library offers an intuitive <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/libs/cep.html#the-pattern-api">Pattern API</a> to easily define these complex patterns.</p>
<p>Every pattern consists of a sequence of events which can have optional filter conditions assigned.
A pattern always starts with a first event to which we will assign the name <code>“First Event”</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">Pattern</span><span class="p">.</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>This pattern will match every monitoring event.
Since we are only interested in <code>TemperatureEvents</code> whose temperature is above a threshold value, we have to add an additional subtype constraint and a where clause:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">Pattern</span><span class="p">.</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">subtype</span><span class="p">(</span><span class="n">TemperatureEvent</span><span class="p">.</span><span class="na">class</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">where</span><span class="p">(</span><span class="n">evt</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">evt</span><span class="p">.</span><span class="na">getTemperature</span><span class="p">()</span><span class="w"> </span><span class="o">&gt;=</span><span class="w"> </span><span class="n">TEMPERATURE_THRESHOLD</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>As stated before, we want to generate a <code>TemperatureWarning</code> if and only if we see two consecutive <code>TemperatureEvents</code> for the same rack whose temperatures are too high.
The Pattern API offers the <code>next</code> call which allows us to add a new event to our pattern.
This event has to follow directly the first matching event in order for the whole pattern to match.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">Pattern</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="p">,</span><span class="w"> </span><span class="o">?&gt;</span><span class="w"> </span><span class="n">warningPattern</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Pattern</span><span class="p">.</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">subtype</span><span class="p">(</span><span class="n">TemperatureEvent</span><span class="p">.</span><span class="na">class</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">where</span><span class="p">(</span><span class="n">evt</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">evt</span><span class="p">.</span><span class="na">getTemperature</span><span class="p">()</span><span class="w"> </span><span class="o">&gt;=</span><span class="w"> </span><span class="n">TEMPERATURE_THRESHOLD</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">next</span><span class="p">(</span><span class="s">&#34;Second Event&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">subtype</span><span class="p">(</span><span class="n">TemperatureEvent</span><span class="p">.</span><span class="na">class</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">where</span><span class="p">(</span><span class="n">evt</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="n">evt</span><span class="p">.</span><span class="na">getTemperature</span><span class="p">()</span><span class="w"> </span><span class="o">&gt;=</span><span class="w"> </span><span class="n">TEMPERATURE_THRESHOLD</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">within</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">seconds</span><span class="p">(</span><span class="n">10</span><span class="p">));</span><span class="w">
</span></span></span></code></pre></div><p>The final pattern definition also contains the <code>within</code> API call which defines that two consecutive <code>TemperatureEvents</code> have to occur within a time interval of 10 seconds for the pattern to match.
Depending on the time characteristic setting, this can either be processing, ingestion or event time.</p>
<p>Having defined the event pattern, we can now apply it on the <code>inputEventStream</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">PatternStream</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="w"> </span><span class="n">tempPatternStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">CEP</span><span class="p">.</span><span class="na">pattern</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">inputEventStream</span><span class="p">.</span><span class="na">keyBy</span><span class="p">(</span><span class="s">&#34;rackID&#34;</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">warningPattern</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Since we want to generate our warnings for each rack individually, we <code>keyBy</code> the input event stream by the <code>“rackID”</code> POJO field.
This enforces that matching events of our pattern will all have the same rack ID.</p>
<p>The <code>PatternStream&lt;MonitoringEvent&gt;</code> gives us access to successfully matched event sequences.
They can be accessed using the <code>select</code> API call.
The <code>select</code> API call takes a <code>PatternSelectFunction</code> which is called for every matching event sequence.
The event sequence is provided as a <code>Map&lt;String, MonitoringEvent&gt;</code> where each <code>MonitoringEvent</code> is identified by its assigned event name.
Our pattern select function generates for each matching pattern a <code>TemperatureWarning</code> event.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">TemperatureWarning</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">rackID</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">double</span><span class="w"> </span><span class="n">averageTemperature</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span><span class="w"> </span><span class="n">warnings</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">tempPatternStream</span><span class="p">.</span><span class="na">select</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="w"> </span><span class="n">pattern</span><span class="p">)</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TemperatureEvent</span><span class="w"> </span><span class="n">first</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">(</span><span class="n">TemperatureEvent</span><span class="p">)</span><span class="w"> </span><span class="n">pattern</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TemperatureEvent</span><span class="w"> </span><span class="n">second</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="p">(</span><span class="n">TemperatureEvent</span><span class="p">)</span><span class="w"> </span><span class="n">pattern</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="s">&#34;Second Event&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">return</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">TemperatureWarning</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">first</span><span class="p">.</span><span class="na">getRackID</span><span class="p">(),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="n">first</span><span class="p">.</span><span class="na">getTemperature</span><span class="p">()</span><span class="w"> </span><span class="o">+</span><span class="w"> </span><span class="n">second</span><span class="p">.</span><span class="na">getTemperature</span><span class="p">())</span><span class="w"> </span><span class="o">/</span><span class="w"> </span><span class="n">2</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Now we have generated a new complex event stream <code>DataStream&lt;TemperatureWarning&gt; warnings</code> from the initial monitoring event stream.
This complex event stream can again be used as the input for another round of complex event processing.
We use the <code>TemperatureWarnings</code> to generate <code>TemperatureAlerts</code> whenever we see two consecutive <code>TemperatureWarnings</code> for the same rack with increasing temperatures.
The <code>TemperatureAlerts</code> have the following definition:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="kd">public</span><span class="w"> </span><span class="kd">class</span> <span class="nc">TemperatureAlert</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="kd">private</span><span class="w"> </span><span class="kt">int</span><span class="w"> </span><span class="n">rackID</span><span class="p">;</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">...</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>At first, we have to define our alert event pattern:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">Pattern</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="p">,</span><span class="w"> </span><span class="o">?&gt;</span><span class="w"> </span><span class="n">alertPattern</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">Pattern</span><span class="p">.</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span><span class="n">begin</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">next</span><span class="p">(</span><span class="s">&#34;Second Event&#34;</span><span class="p">)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">within</span><span class="p">(</span><span class="n">Time</span><span class="p">.</span><span class="na">seconds</span><span class="p">(</span><span class="n">20</span><span class="p">));</span><span class="w">
</span></span></span></code></pre></div><p>This definition says that we want to see two <code>TemperatureWarnings</code> within 20 seconds.
The first event has the name <code>“First Event”</code> and the second consecutive event has the name <code>“Second Event”</code>.
The individual events don’t have a where clause assigned, because we need access to both events in order to decide whether the temperature is increasing.
Therefore, we apply the filter condition in the select clause.
But first, we obtain again a <code>PatternStream</code>.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">PatternStream</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span><span class="w"> </span><span class="n">alertPatternStream</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">CEP</span><span class="p">.</span><span class="na">pattern</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">warnings</span><span class="p">.</span><span class="na">keyBy</span><span class="p">(</span><span class="s">&#34;rackID&#34;</span><span class="p">),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">alertPattern</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Again, we <code>keyBy</code> the warnings input stream by the <code>&quot;rackID&quot;</code> so that we generate our alerts for each rack individually.
Next we apply the <code>flatSelect</code> method which will give us access to matching event sequences and allows us to output an arbitrary number of complex events.
Thus, we will only generate a <code>TemperatureAlert</code> if and only if the temperature is increasing.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">TemperatureAlert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">alerts</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">alertPatternStream</span><span class="p">.</span><span class="na">flatSelect</span><span class="p">(</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="w"> </span><span class="n">TemperatureWarning</span><span class="o">&gt;</span><span class="w"> </span><span class="n">pattern</span><span class="p">,</span><span class="w"> </span><span class="n">Collector</span><span class="o">&lt;</span><span class="n">TemperatureAlert</span><span class="o">&gt;</span><span class="w"> </span><span class="n">out</span><span class="p">)</span><span class="w"> </span><span class="o">-&gt;</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TemperatureWarning</span><span class="w"> </span><span class="n">first</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">pattern</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="s">&#34;First Event&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TemperatureWarning</span><span class="w"> </span><span class="n">second</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">pattern</span><span class="p">.</span><span class="na">get</span><span class="p">(</span><span class="s">&#34;Second Event&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">if</span><span class="w"> </span><span class="p">(</span><span class="n">first</span><span class="p">.</span><span class="na">getAverageTemperature</span><span class="p">()</span><span class="w"> </span><span class="o">&lt;</span><span class="w"> </span><span class="n">second</span><span class="p">.</span><span class="na">getAverageTemperature</span><span class="p">())</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">out</span><span class="p">.</span><span class="na">collect</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">TemperatureAlert</span><span class="p">(</span><span class="n">first</span><span class="p">.</span><span class="na">getRackID</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">}</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">});</span><span class="w">
</span></span></span></code></pre></div><p>The <code>DataStream&lt;TemperatureAlert&gt; alerts</code> is the data stream of temperature alerts for each rack.
Based on these alerts we can now adapt the workload or cooling for overheating racks.</p>
<p>The full source code for the presented example as well as an example data source which generates randomly monitoring events can be found in <a href="https://github.com/tillrohrmann/cep-monitoring">this repository</a>.</p>
<h2 id="conclusion">
Conclusion
<a class="anchor" href="#conclusion">#</a>
</h2>
<p>In this blog post we have seen how easy it is to reason about event streams using Flink’s CEP library.
Using the example of monitoring and alert generation for a data center, we have implemented a short program which notifies us when a rack is about to overheat and potentially to fail.</p>
<p>In the future, the Flink community will further extend the CEP library’s functionality and expressiveness.
Next on the road map is support for a regular expression-like pattern specification, including Kleene star, lower and upper bounds, and negation.
Furthermore, it is planned to allow the where-clause to access fields of previously matched events.
This feature will allow to prune unpromising event sequences early.</p>
<hr />
<p><em>Note:</em> The example code requires Flink 1.0.1 or higher.</p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2016-04-06-cep-monitoring.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#monitoring-and-alert-generation-for-data-centers">Monitoring and alert generation for data centers</a>
<ul>
<li><a href="#implementation-with-apache-flink">Implementation with Apache Flink</a></li>
</ul>
</li>
<li><a href="#conclusion">Conclusion</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>