blob: c5c9cd35687ebeb234e2801d3d15ca8aa7264a87 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink: Introducing Complex Event Processing (CEP) with Apache Flink</title>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/flink.css">
<link rel="stylesheet" href="/css/syntax.css">
<!-- Blog RSS feed -->
<link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<!-- We need to load Jquery in the header for custom google analytics event tracking-->
<script src="/js/jquery.min.js"></script>
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Main content. -->
<div class="container">
<div class="row">
<div id="sidebar" class="col-sm-3">
<!-- Top navbar. -->
<nav class="navbar navbar-default">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="/">
<img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
</a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-main">
<!-- First menu section explains visitors what Flink is -->
<!-- What is Stream Processing? -->
<!--
<li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
-->
<!-- What is Flink? -->
<li><a href="/flink-architecture.html">What is Apache Flink?</a></li>
<!-- What is Stateful Functions? -->
<li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>
<!-- Use cases -->
<li><a href="/usecases.html">Use Cases</a></li>
<!-- Powered by -->
<li><a href="/poweredby.html">Powered By</a></li>
&nbsp;
<!-- Second menu section aims to support Flink users -->
<!-- Downloads -->
<li><a href="/downloads.html">Downloads</a></li>
<!-- Getting Started -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="/training.html">Training Course</a></li>
</ul>
</li>
<!-- Documentation -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10" target="_blank">Flink 1.10 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
</ul>
</li>
<!-- getting help -->
<li><a href="/gettinghelp.html">Getting Help</a></li>
<!-- Blog -->
<li class="active"><a href="/blog/"><b>Flink Blog</b></a></li>
<!-- Flink-packages -->
<li>
<a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Third menu section aim to support community and contributors -->
<!-- Community -->
<li><a href="/community.html">Community &amp; Project Info</a></li>
<!-- Roadmap -->
<li><a href="/roadmap.html">Roadmap</a></li>
<!-- Contribute -->
<li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
<!-- GitHub -->
<li>
<a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Language Switcher -->
<li>
<!-- link to the Chinese home page when current is blog page -->
<a href="/zh">中文版</a>
</li>
</ul>
<ul class="nav navbar-nav navbar-bottom">
<hr />
<!-- Twitter -->
<li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<!-- Visualizer -->
<li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<hr />
<li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li>
<style>
.smalllinks:link {
display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
}
</style>
<a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
</li>
</ul>
</div><!-- /.navbar-collapse -->
</nav>
</div>
<div class="col-sm-9">
<div class="row-fluid">
<div class="col-sm-12">
<div class="row">
<h1>Introducing Complex Event Processing (CEP) with Apache Flink</h1>
<p><i></i></p>
<article>
<p>06 Apr 2016 by Till Rohrmann (<a href="https://twitter.com/stsffap">@stsffap</a>)</p>
<p>With the ubiquity of sensor networks and smart devices continuously collecting more and more data, we face the challenge to analyze an ever growing stream of data in near real-time.
Being able to react quickly to changing trends or to deliver up to date business intelligence can be a decisive factor for a company’s success or failure.
A key problem in real time processing is the detection of event patterns in data streams.</p>
<p>Complex event processing (CEP) addresses exactly this problem of matching continuously incoming events against a pattern.
The result of a matching are usually complex events which are derived from the input events.
In contrast to traditional DBMSs where a query is executed on stored data, CEP executes data on a stored query.
All data which is not relevant for the query can be immediately discarded.
The advantages of this approach are obvious, given that CEP queries are applied on a potentially infinite stream of data.
Furthermore, inputs are processed immediately.
Once the system has seen all events for a matching sequence, results are emitted straight away.
This aspect effectively leads to CEP’s real time analytics capability.</p>
<p>Consequently, CEP’s processing paradigm drew significant interest and found application in a wide variety of use cases.
Most notably, CEP is used nowadays for financial applications such as stock market trend and credit card fraud detection.
Moreover, it is used in RFID-based tracking and monitoring, for example, to detect thefts in a warehouse where items are not properly checked out.
CEP can also be used to detect network intrusion by specifying patterns of suspicious user behaviour.</p>
<p>Apache Flink with its true streaming nature and its capabilities for low latency as well as high throughput stream processing is a natural fit for CEP workloads.
Consequently, the Flink community has introduced the first version of a new <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html">CEP library</a> with <a href="http://flink.apache.org/news/2016/03/08/release-1.0.0.html">Flink 1.0</a>.
In the remainder of this blog post, we introduce Flink’s CEP library and we illustrate its ease of use through the example of monitoring a data center.</p>
<h2 id="monitoring-and-alert-generation-for-data-centers">Monitoring and alert generation for data centers</h2>
<center>
<img src="/img/blog/cep-monitoring.svg" style="width:600px;margin:15px" />
</center>
<p>Assume we have a data center with a number of racks.
For each rack the power consumption and the temperature are monitored.
Whenever such a measurement takes place, a new power or temperature event is generated, respectively.
Based on this monitoring event stream, we want to detect racks that are about to overheat, and dynamically adapt their workload and cooling.</p>
<p>For this scenario we use a two staged approach.
First, we monitor the temperature events.
Whenever we see two consecutive events whose temperature exceeds a threshold value, we generate a temperature warning with the current average temperature.
A temperature warning does not necessarily indicate that a rack is about to overheat.
But whenever we see two consecutive warnings with increasing temperatures, then we want to issue an alert for this rack.
This alert can then lead to countermeasures to cool the rack.</p>
<h3 id="implementation-with-apache-flink">Implementation with Apache Flink</h3>
<p>First, we define the messages of the incoming monitoring event stream.
Every monitoring message contains its originating rack ID.
The temperature event additionally contains the current temperature and the power consumption event contains the current voltage.
We model the events as POJOs:</p>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">abstract</span> <span class="kd">class</span> <span class="nc">MonitoringEvent</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureEvent</span> <span class="kd">extends</span> <span class="n">MonitoringEvent</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">double</span> <span class="n">temperature</span><span class="o">;</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="kd">public</span> <span class="kd">class</span> <span class="nc">PowerEvent</span> <span class="kd">extends</span> <span class="n">MonitoringEvent</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">double</span> <span class="n">voltage</span><span class="o">;</span>
<span class="o">...</span>
<span class="o">}</span></code></pre></div>
<p>Now we can ingest the monitoring event stream using one of Flink’s connectors (e.g. Kafka, RabbitMQ, etc.).
This will give us a <code>DataStream&lt;MonitoringEvent&gt; inputEventStream</code> which we will use as the input for Flink’s CEP operator.
But first, we have to define the event pattern to detect temperature warnings.
The CEP library offers an intuitive <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html#the-pattern-api">Pattern API</a> to easily define these complex patterns.</p>
<p>Every pattern consists of a sequence of events which can have optional filter conditions assigned.
A pattern always starts with a first event to which we will assign the name <code>“First Event”</code>.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">.&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">);</span></code></pre></div>
<p>This pattern will match every monitoring event.
Since we are only interested in <code>TemperatureEvents</code> whose temperature is above a threshold value, we have to add an additional subtype constraint and a where clause:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">.&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-&gt;</span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">&gt;=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">);</span></code></pre></div>
<p>As stated before, we want to generate a <code>TemperatureWarning</code> if and only if we see two consecutive <code>TemperatureEvents</code> for the same rack whose temperatures are too high.
The Pattern API offers the <code>next</code> call which allows us to add a new event to our pattern.
This event has to follow directly the first matching event in order for the whole pattern to match.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">,</span> <span class="o">?&gt;</span> <span class="n">warningPattern</span> <span class="o">=</span> <span class="n">Pattern</span><span class="o">.&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span><span class="n">begin</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-&gt;</span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">&gt;=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">)</span>
<span class="o">.</span><span class="na">next</span><span class="o">(</span><span class="s">&quot;Second Event&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span>
<span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-&gt;</span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">&gt;=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">)</span>
<span class="o">.</span><span class="na">within</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">));</span></code></pre></div>
<p>The final pattern definition also contains the <code>within</code> API call which defines that two consecutive <code>TemperatureEvents</code> have to occur within a time interval of 10 seconds for the pattern to match.
Depending on the time characteristic setting, this can either be processing, ingestion or event time.</p>
<p>Having defined the event pattern, we can now apply it on the <code>inputEventStream</code>.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">PatternStream</span><span class="o">&lt;</span><span class="n">MonitoringEvent</span><span class="o">&gt;</span> <span class="n">tempPatternStream</span> <span class="o">=</span> <span class="n">CEP</span><span class="o">.</span><span class="na">pattern</span><span class="o">(</span>
<span class="n">inputEventStream</span><span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="s">&quot;rackID&quot;</span><span class="o">),</span>
<span class="n">warningPattern</span><span class="o">);</span></code></pre></div>
<p>Since we want to generate our warnings for each rack individually, we <code>keyBy</code> the input event stream by the <code>“rackID”</code> POJO field.
This enforces that matching events of our pattern will all have the same rack ID.</p>
<p>The <code>PatternStream&lt;MonitoringEvent&gt;</code> gives us access to successfully matched event sequences.
They can be accessed using the <code>select</code> API call.
The <code>select</code> API call takes a <code>PatternSelectFunction</code> which is called for every matching event sequence.
The event sequence is provided as a <code>Map&lt;String, MonitoringEvent&gt;</code> where each <code>MonitoringEvent</code> is identified by its assigned event name.
Our pattern select function generates for each matching pattern a <code>TemperatureWarning</code> event.</p>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureWarning</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span>
<span class="kd">private</span> <span class="kt">double</span> <span class="n">averageTemperature</span><span class="o">;</span>
<span class="o">...</span>
<span class="o">}</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span> <span class="n">warnings</span> <span class="o">=</span> <span class="n">tempPatternStream</span><span class="o">.</span><span class="na">select</span><span class="o">(</span>
<span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">MonitoringEvent</span><span class="o">&gt;</span> <span class="n">pattern</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">TemperatureEvent</span> <span class="n">first</span> <span class="o">=</span> <span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">)</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">);</span>
<span class="n">TemperatureEvent</span> <span class="n">second</span> <span class="o">=</span> <span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">)</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;Second Event&quot;</span><span class="o">);</span>
<span class="k">return</span> <span class="k">new</span> <span class="nf">TemperatureWarning</span><span class="o">(</span>
<span class="n">first</span><span class="o">.</span><span class="na">getRackID</span><span class="o">(),</span>
<span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">+</span> <span class="n">second</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">())</span> <span class="o">/</span> <span class="mi">2</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">);</span></code></pre></div>
<p>Now we have generated a new complex event stream <code>DataStream&lt;TemperatureWarning&gt; warnings</code> from the initial monitoring event stream.
This complex event stream can again be used as the input for another round of complex event processing.
We use the <code>TemperatureWarnings</code> to generate <code>TemperatureAlerts</code> whenever we see two consecutive <code>TemperatureWarnings</code> for the same rack with increasing temperatures.
The <code>TemperatureAlerts</code> have the following definition:</p>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureAlert</span> <span class="o">{</span>
<span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span>
<span class="o">...</span>
<span class="o">}</span></code></pre></div>
<p>At first, we have to define our alert event pattern:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">,</span> <span class="o">?&gt;</span> <span class="n">alertPattern</span> <span class="o">=</span> <span class="n">Pattern</span><span class="o">.&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span><span class="n">begin</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">next</span><span class="o">(</span><span class="s">&quot;Second Event&quot;</span><span class="o">)</span>
<span class="o">.</span><span class="na">within</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">));</span></code></pre></div>
<p>This definition says that we want to see two <code>TemperatureWarnings</code> within 20 seconds.
The first event has the name <code>“First Event”</code> and the second consecutive event has the name <code>“Second Event”</code>.
The individual events don’t have a where clause assigned, because we need access to both events in order to decide whether the temperature is increasing.
Therefore, we apply the filter condition in the select clause.
But first, we obtain again a <code>PatternStream</code>.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">PatternStream</span><span class="o">&lt;</span><span class="n">TemperatureWarning</span><span class="o">&gt;</span> <span class="n">alertPatternStream</span> <span class="o">=</span> <span class="n">CEP</span><span class="o">.</span><span class="na">pattern</span><span class="o">(</span>
<span class="n">warnings</span><span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="s">&quot;rackID&quot;</span><span class="o">),</span>
<span class="n">alertPattern</span><span class="o">);</span></code></pre></div>
<p>Again, we <code>keyBy</code> the warnings input stream by the <code>"rackID"</code> so that we generate our alerts for each rack individually.
Next we apply the <code>flatSelect</code> method which will give us access to matching event sequences and allows us to output an arbitrary number of complex events.
Thus, we will only generate a <code>TemperatureAlert</code> if and only if the temperature is increasing.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">TemperatureAlert</span><span class="o">&gt;</span> <span class="n">alerts</span> <span class="o">=</span> <span class="n">alertPatternStream</span><span class="o">.</span><span class="na">flatSelect</span><span class="o">(</span>
<span class="o">(</span><span class="n">Map</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">TemperatureWarning</span><span class="o">&gt;</span> <span class="n">pattern</span><span class="o">,</span> <span class="n">Collector</span><span class="o">&lt;</span><span class="n">TemperatureAlert</span><span class="o">&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
<span class="n">TemperatureWarning</span> <span class="n">first</span> <span class="o">=</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;First Event&quot;</span><span class="o">);</span>
<span class="n">TemperatureWarning</span> <span class="n">second</span> <span class="o">=</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">&quot;Second Event&quot;</span><span class="o">);</span>
<span class="k">if</span> <span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getAverageTemperature</span><span class="o">()</span> <span class="o">&lt;</span> <span class="n">second</span><span class="o">.</span><span class="na">getAverageTemperature</span><span class="o">())</span> <span class="o">{</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">TemperatureAlert</span><span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getRackID</span><span class="o">()));</span>
<span class="o">}</span>
<span class="o">});</span></code></pre></div>
<p>The <code>DataStream&lt;TemperatureAlert&gt; alerts</code> is the data stream of temperature alerts for each rack.
Based on these alerts we can now adapt the workload or cooling for overheating racks.</p>
<p>The full source code for the presented example as well as an example data source which generates randomly monitoring events can be found in <a href="https://github.com/tillrohrmann/cep-monitoring">this repository</a>.</p>
<h2 id="conclusion">Conclusion</h2>
<p>In this blog post we have seen how easy it is to reason about event streams using Flink’s CEP library.
Using the example of monitoring and alert generation for a data center, we have implemented a short program which notifies us when a rack is about to overheat and potentially to fail.</p>
<p>In the future, the Flink community will further extend the CEP library’s functionality and expressiveness.
Next on the road map is support for a regular expression-like pattern specification, including Kleene star, lower and upper bounds, and negation.
Furthermore, it is planned to allow the where-clause to access fields of previously matched events.
This feature will allow to prune unpromising event sequences early.</p>
<hr />
<p><em>Note:</em> The example code requires Flink 1.0.1 or higher.</p>
</article>
</div>
<div class="row">
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</div>
</div>
</div>
</div>
</div>
<hr />
<div class="row">
<div class="footer text-center col-sm-12">
<p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
<p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
<p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
</div>
</div>
</div><!-- /.container -->
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
<script src="/js/codetabs.js"></script>
<script src="/js/stickysidebar.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
</body>
</html>