| <!DOCTYPE html> |
| <html lang="en"> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| <!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags --> |
| <title>Apache Flink: Introducing Complex Event Processing (CEP) with Apache Flink</title> |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <!-- Bootstrap --> |
| <link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css"> |
| <link rel="stylesheet" href="/css/flink.css"> |
| <link rel="stylesheet" href="/css/syntax.css"> |
| |
| <!-- Blog RSS feed --> |
| <link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" /> |
| |
| <!-- jQuery (necessary for Bootstrap's JavaScript plugins) --> |
| <!-- We need to load Jquery in the header for custom google analytics event tracking--> |
| <script src="/js/jquery.min.js"></script> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!-- WARNING: Respond.js doesn't work if you view the page via file:// --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| <body> |
| |
| |
| <!-- Main content. --> |
| <div class="container"> |
| <div class="row"> |
| |
| |
| <div id="sidebar" class="col-sm-3"> |
| |
| |
| <!-- Top navbar. --> |
| <nav class="navbar navbar-default"> |
| <!-- The logo. --> |
| <div class="navbar-header"> |
| <button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| <div class="navbar-logo"> |
| <a href="/"> |
| <img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px"> |
| </a> |
| </div> |
| </div><!-- /.navbar-header --> |
| |
| <!-- The navigation links. --> |
| <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1"> |
| <ul class="nav navbar-nav navbar-main"> |
| |
| <!-- First menu section explains visitors what Flink is --> |
| |
| <!-- What is Stream Processing? --> |
| <!-- |
| <li><a href="/streamprocessing1.html">What is Stream Processing?</a></li> |
| --> |
| |
| <!-- What is Flink? --> |
| <li><a href="/flink-architecture.html">What is Apache Flink?</a></li> |
| |
| |
| |
| <!-- What is Stateful Functions? --> |
| |
| <li><a href="/stateful-functions.html">What is Stateful Functions?</a></li> |
| |
| <!-- Use cases --> |
| <li><a href="/usecases.html">Use Cases</a></li> |
| |
| <!-- Powered by --> |
| <li><a href="/poweredby.html">Powered By</a></li> |
| |
| |
| |
| <!-- Second menu section aims to support Flink users --> |
| |
| <!-- Downloads --> |
| <li><a href="/downloads.html">Downloads</a></li> |
| |
| <!-- Getting Started --> |
| <li class="dropdown"> |
| <a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| <li><a href="/training.html">Training Course</a></li> |
| </ul> |
| </li> |
| |
| <!-- Documentation --> |
| <li class="dropdown"> |
| <a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a> |
| <ul class="dropdown-menu"> |
| <li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.10" target="_blank">Flink 1.10 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| <li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| <li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| </ul> |
| </li> |
| |
| <!-- getting help --> |
| <li><a href="/gettinghelp.html">Getting Help</a></li> |
| |
| <!-- Blog --> |
| <li class="active"><a href="/blog/"><b>Flink Blog</b></a></li> |
| |
| |
| <!-- Flink-packages --> |
| <li> |
| <a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a> |
| </li> |
| |
| |
| <!-- Third menu section aim to support community and contributors --> |
| |
| <!-- Community --> |
| <li><a href="/community.html">Community & Project Info</a></li> |
| |
| <!-- Roadmap --> |
| <li><a href="/roadmap.html">Roadmap</a></li> |
| |
| <!-- Contribute --> |
| <li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li> |
| |
| |
| <!-- GitHub --> |
| <li> |
| <a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a> |
| </li> |
| |
| |
| |
| <!-- Language Switcher --> |
| <li> |
| |
| |
| <!-- link to the Chinese home page when current is blog page --> |
| <a href="/zh">中文版</a> |
| |
| |
| </li> |
| |
| </ul> |
| |
| <ul class="nav navbar-nav navbar-bottom"> |
| <hr /> |
| |
| <!-- Twitter --> |
| <li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| |
| <!-- Visualizer --> |
| <li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| |
| <hr /> |
| |
| <li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li> |
| |
| <li> |
| <style> |
| .smalllinks:link { |
| display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px; |
| } |
| </style> |
| |
| <a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small> |
| |
| <a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small> |
| |
| <a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small> |
| |
| <a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small> |
| </li> |
| |
| </ul> |
| </div><!-- /.navbar-collapse --> |
| </nav> |
| |
| </div> |
| <div class="col-sm-9"> |
| <div class="row-fluid"> |
| <div class="col-sm-12"> |
| <div class="row"> |
| <h1>Introducing Complex Event Processing (CEP) with Apache Flink</h1> |
| <p><i></i></p> |
| |
| <article> |
| <p>06 Apr 2016 by Till Rohrmann (<a href="https://twitter.com/stsffap">@stsffap</a>)</p> |
| |
| <p>With the ubiquity of sensor networks and smart devices continuously collecting more and more data, we face the challenge to analyze an ever growing stream of data in near real-time. |
| Being able to react quickly to changing trends or to deliver up to date business intelligence can be a decisive factor for a company’s success or failure. |
| A key problem in real time processing is the detection of event patterns in data streams.</p> |
| |
| <p>Complex event processing (CEP) addresses exactly this problem of matching continuously incoming events against a pattern. |
| The result of a matching are usually complex events which are derived from the input events. |
| In contrast to traditional DBMSs where a query is executed on stored data, CEP executes data on a stored query. |
| All data which is not relevant for the query can be immediately discarded. |
| The advantages of this approach are obvious, given that CEP queries are applied on a potentially infinite stream of data. |
| Furthermore, inputs are processed immediately. |
| Once the system has seen all events for a matching sequence, results are emitted straight away. |
| This aspect effectively leads to CEP’s real time analytics capability.</p> |
| |
| <p>Consequently, CEP’s processing paradigm drew significant interest and found application in a wide variety of use cases. |
| Most notably, CEP is used nowadays for financial applications such as stock market trend and credit card fraud detection. |
| Moreover, it is used in RFID-based tracking and monitoring, for example, to detect thefts in a warehouse where items are not properly checked out. |
| CEP can also be used to detect network intrusion by specifying patterns of suspicious user behaviour.</p> |
| |
| <p>Apache Flink with its true streaming nature and its capabilities for low latency as well as high throughput stream processing is a natural fit for CEP workloads. |
| Consequently, the Flink community has introduced the first version of a new <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html">CEP library</a> with <a href="http://flink.apache.org/news/2016/03/08/release-1.0.0.html">Flink 1.0</a>. |
| In the remainder of this blog post, we introduce Flink’s CEP library and we illustrate its ease of use through the example of monitoring a data center.</p> |
| |
| <h2 id="monitoring-and-alert-generation-for-data-centers">Monitoring and alert generation for data centers</h2> |
| |
| <center> |
| <img src="/img/blog/cep-monitoring.svg" style="width:600px;margin:15px" /> |
| </center> |
| |
| <p>Assume we have a data center with a number of racks. |
| For each rack the power consumption and the temperature are monitored. |
| Whenever such a measurement takes place, a new power or temperature event is generated, respectively. |
| Based on this monitoring event stream, we want to detect racks that are about to overheat, and dynamically adapt their workload and cooling.</p> |
| |
| <p>For this scenario we use a two staged approach. |
| First, we monitor the temperature events. |
| Whenever we see two consecutive events whose temperature exceeds a threshold value, we generate a temperature warning with the current average temperature. |
| A temperature warning does not necessarily indicate that a rack is about to overheat. |
| But whenever we see two consecutive warnings with increasing temperatures, then we want to issue an alert for this rack. |
| This alert can then lead to countermeasures to cool the rack.</p> |
| |
| <h3 id="implementation-with-apache-flink">Implementation with Apache Flink</h3> |
| |
| <p>First, we define the messages of the incoming monitoring event stream. |
| Every monitoring message contains its originating rack ID. |
| The temperature event additionally contains the current temperature and the power consumption event contains the current voltage. |
| We model the events as POJOs:</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">abstract</span> <span class="kd">class</span> <span class="nc">MonitoringEvent</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureEvent</span> <span class="kd">extends</span> <span class="n">MonitoringEvent</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">double</span> <span class="n">temperature</span><span class="o">;</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="kd">public</span> <span class="kd">class</span> <span class="nc">PowerEvent</span> <span class="kd">extends</span> <span class="n">MonitoringEvent</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">double</span> <span class="n">voltage</span><span class="o">;</span> |
| <span class="o">...</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>Now we can ingest the monitoring event stream using one of Flink’s connectors (e.g. Kafka, RabbitMQ, etc.). |
| This will give us a <code>DataStream<MonitoringEvent> inputEventStream</code> which we will use as the input for Flink’s CEP operator. |
| But first, we have to define the event pattern to detect temperature warnings. |
| The CEP library offers an intuitive <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/libs/cep.html#the-pattern-api">Pattern API</a> to easily define these complex patterns.</p> |
| |
| <p>Every pattern consists of a sequence of events which can have optional filter conditions assigned. |
| A pattern always starts with a first event to which we will assign the name <code>“First Event”</code>.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">.<</span><span class="n">MonitoringEvent</span><span class="o">></span><span class="n">begin</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">);</span></code></pre></div> |
| |
| <p>This pattern will match every monitoring event. |
| Since we are only interested in <code>TemperatureEvents</code> whose temperature is above a threshold value, we have to add an additional subtype constraint and a where clause:</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o">.<</span><span class="n">MonitoringEvent</span><span class="o">></span><span class="n">begin</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-></span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">>=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">);</span></code></pre></div> |
| |
| <p>As stated before, we want to generate a <code>TemperatureWarning</code> if and only if we see two consecutive <code>TemperatureEvents</code> for the same rack whose temperatures are too high. |
| The Pattern API offers the <code>next</code> call which allows us to add a new event to our pattern. |
| This event has to follow directly the first matching event in order for the whole pattern to match.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o"><</span><span class="n">MonitoringEvent</span><span class="o">,</span> <span class="o">?></span> <span class="n">warningPattern</span> <span class="o">=</span> <span class="n">Pattern</span><span class="o">.<</span><span class="n">MonitoringEvent</span><span class="o">></span><span class="n">begin</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-></span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">>=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">next</span><span class="o">(</span><span class="s">"Second Event"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">subtype</span><span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">.</span><span class="na">class</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">where</span><span class="o">(</span><span class="n">evt</span> <span class="o">-></span> <span class="n">evt</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">>=</span> <span class="n">TEMPERATURE_THRESHOLD</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">within</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">10</span><span class="o">));</span></code></pre></div> |
| |
| <p>The final pattern definition also contains the <code>within</code> API call which defines that two consecutive <code>TemperatureEvents</code> have to occur within a time interval of 10 seconds for the pattern to match. |
| Depending on the time characteristic setting, this can either be processing, ingestion or event time.</p> |
| |
| <p>Having defined the event pattern, we can now apply it on the <code>inputEventStream</code>.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">PatternStream</span><span class="o"><</span><span class="n">MonitoringEvent</span><span class="o">></span> <span class="n">tempPatternStream</span> <span class="o">=</span> <span class="n">CEP</span><span class="o">.</span><span class="na">pattern</span><span class="o">(</span> |
| <span class="n">inputEventStream</span><span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="s">"rackID"</span><span class="o">),</span> |
| <span class="n">warningPattern</span><span class="o">);</span></code></pre></div> |
| |
| <p>Since we want to generate our warnings for each rack individually, we <code>keyBy</code> the input event stream by the <code>“rackID”</code> POJO field. |
| This enforces that matching events of our pattern will all have the same rack ID.</p> |
| |
| <p>The <code>PatternStream<MonitoringEvent></code> gives us access to successfully matched event sequences. |
| They can be accessed using the <code>select</code> API call. |
| The <code>select</code> API call takes a <code>PatternSelectFunction</code> which is called for every matching event sequence. |
| The event sequence is provided as a <code>Map<String, MonitoringEvent></code> where each <code>MonitoringEvent</code> is identified by its assigned event name. |
| Our pattern select function generates for each matching pattern a <code>TemperatureWarning</code> event.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureWarning</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span> |
| <span class="kd">private</span> <span class="kt">double</span> <span class="n">averageTemperature</span><span class="o">;</span> |
| <span class="o">...</span> |
| <span class="o">}</span> |
| |
| <span class="n">DataStream</span><span class="o"><</span><span class="n">TemperatureWarning</span><span class="o">></span> <span class="n">warnings</span> <span class="o">=</span> <span class="n">tempPatternStream</span><span class="o">.</span><span class="na">select</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">MonitoringEvent</span><span class="o">></span> <span class="n">pattern</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> |
| <span class="n">TemperatureEvent</span> <span class="n">first</span> <span class="o">=</span> <span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">)</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">);</span> |
| <span class="n">TemperatureEvent</span> <span class="n">second</span> <span class="o">=</span> <span class="o">(</span><span class="n">TemperatureEvent</span><span class="o">)</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"Second Event"</span><span class="o">);</span> |
| |
| <span class="k">return</span> <span class="k">new</span> <span class="nf">TemperatureWarning</span><span class="o">(</span> |
| <span class="n">first</span><span class="o">.</span><span class="na">getRackID</span><span class="o">(),</span> |
| <span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">()</span> <span class="o">+</span> <span class="n">second</span><span class="o">.</span><span class="na">getTemperature</span><span class="o">())</span> <span class="o">/</span> <span class="mi">2</span><span class="o">);</span> |
| <span class="o">}</span> |
| <span class="o">);</span></code></pre></div> |
| |
| <p>Now we have generated a new complex event stream <code>DataStream<TemperatureWarning> warnings</code> from the initial monitoring event stream. |
| This complex event stream can again be used as the input for another round of complex event processing. |
| We use the <code>TemperatureWarnings</code> to generate <code>TemperatureAlerts</code> whenever we see two consecutive <code>TemperatureWarnings</code> for the same rack with increasing temperatures. |
| The <code>TemperatureAlerts</code> have the following definition:</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">class</span> <span class="nc">TemperatureAlert</span> <span class="o">{</span> |
| <span class="kd">private</span> <span class="kt">int</span> <span class="n">rackID</span><span class="o">;</span> |
| <span class="o">...</span> |
| <span class="o">}</span></code></pre></div> |
| |
| <p>At first, we have to define our alert event pattern:</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">Pattern</span><span class="o"><</span><span class="n">TemperatureWarning</span><span class="o">,</span> <span class="o">?></span> <span class="n">alertPattern</span> <span class="o">=</span> <span class="n">Pattern</span><span class="o">.<</span><span class="n">TemperatureWarning</span><span class="o">></span><span class="n">begin</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">next</span><span class="o">(</span><span class="s">"Second Event"</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">within</span><span class="o">(</span><span class="n">Time</span><span class="o">.</span><span class="na">seconds</span><span class="o">(</span><span class="mi">20</span><span class="o">));</span></code></pre></div> |
| |
| <p>This definition says that we want to see two <code>TemperatureWarnings</code> within 20 seconds. |
| The first event has the name <code>“First Event”</code> and the second consecutive event has the name <code>“Second Event”</code>. |
| The individual events don’t have a where clause assigned, because we need access to both events in order to decide whether the temperature is increasing. |
| Therefore, we apply the filter condition in the select clause. |
| But first, we obtain again a <code>PatternStream</code>.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">PatternStream</span><span class="o"><</span><span class="n">TemperatureWarning</span><span class="o">></span> <span class="n">alertPatternStream</span> <span class="o">=</span> <span class="n">CEP</span><span class="o">.</span><span class="na">pattern</span><span class="o">(</span> |
| <span class="n">warnings</span><span class="o">.</span><span class="na">keyBy</span><span class="o">(</span><span class="s">"rackID"</span><span class="o">),</span> |
| <span class="n">alertPattern</span><span class="o">);</span></code></pre></div> |
| |
| <p>Again, we <code>keyBy</code> the warnings input stream by the <code>"rackID"</code> so that we generate our alerts for each rack individually. |
| Next we apply the <code>flatSelect</code> method which will give us access to matching event sequences and allows us to output an arbitrary number of complex events. |
| Thus, we will only generate a <code>TemperatureAlert</code> if and only if the temperature is increasing.</p> |
| |
| <div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o"><</span><span class="n">TemperatureAlert</span><span class="o">></span> <span class="n">alerts</span> <span class="o">=</span> <span class="n">alertPatternStream</span><span class="o">.</span><span class="na">flatSelect</span><span class="o">(</span> |
| <span class="o">(</span><span class="n">Map</span><span class="o"><</span><span class="n">String</span><span class="o">,</span> <span class="n">TemperatureWarning</span><span class="o">></span> <span class="n">pattern</span><span class="o">,</span> <span class="n">Collector</span><span class="o"><</span><span class="n">TemperatureAlert</span><span class="o">></span> <span class="n">out</span><span class="o">)</span> <span class="o">-></span> <span class="o">{</span> |
| <span class="n">TemperatureWarning</span> <span class="n">first</span> <span class="o">=</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"First Event"</span><span class="o">);</span> |
| <span class="n">TemperatureWarning</span> <span class="n">second</span> <span class="o">=</span> <span class="n">pattern</span><span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="s">"Second Event"</span><span class="o">);</span> |
| |
| <span class="k">if</span> <span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getAverageTemperature</span><span class="o">()</span> <span class="o"><</span> <span class="n">second</span><span class="o">.</span><span class="na">getAverageTemperature</span><span class="o">())</span> <span class="o">{</span> |
| <span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="nf">TemperatureAlert</span><span class="o">(</span><span class="n">first</span><span class="o">.</span><span class="na">getRackID</span><span class="o">()));</span> |
| <span class="o">}</span> |
| <span class="o">});</span></code></pre></div> |
| |
| <p>The <code>DataStream<TemperatureAlert> alerts</code> is the data stream of temperature alerts for each rack. |
| Based on these alerts we can now adapt the workload or cooling for overheating racks.</p> |
| |
| <p>The full source code for the presented example as well as an example data source which generates randomly monitoring events can be found in <a href="https://github.com/tillrohrmann/cep-monitoring">this repository</a>.</p> |
| |
| <h2 id="conclusion">Conclusion</h2> |
| |
| <p>In this blog post we have seen how easy it is to reason about event streams using Flink’s CEP library. |
| Using the example of monitoring and alert generation for a data center, we have implemented a short program which notifies us when a rack is about to overheat and potentially to fail.</p> |
| |
| <p>In the future, the Flink community will further extend the CEP library’s functionality and expressiveness. |
| Next on the road map is support for a regular expression-like pattern specification, including Kleene star, lower and upper bounds, and negation. |
| Furthermore, it is planned to allow the where-clause to access fields of previously matched events. |
| This feature will allow to prune unpromising event sequences early.</p> |
| |
| <hr /> |
| |
| <p><em>Note:</em> The example code requires Flink 1.0.1 or higher.</p> |
| |
| |
| </article> |
| </div> |
| |
| <div class="row"> |
| <div id="disqus_thread"></div> |
| <script type="text/javascript"> |
| /* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */ |
| var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname |
| |
| /* * * DON'T EDIT BELOW THIS LINE * * */ |
| (function() { |
| var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true; |
| dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js'; |
| (document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq); |
| })(); |
| </script> |
| </div> |
| </div> |
| </div> |
| </div> |
| </div> |
| |
| <hr /> |
| |
| <div class="row"> |
| <div class="footer text-center col-sm-12"> |
| <p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p> |
| <p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p> |
| <p><a href="/privacy-policy.html">Privacy Policy</a> · <a href="/blog/feed.xml">RSS feed</a></p> |
| </div> |
| </div> |
| </div><!-- /.container --> |
| |
| <!-- Include all compiled plugins (below), or include individual files as needed --> |
| <script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script> |
| <script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script> |
| <script src="/js/codetabs.js"></script> |
| <script src="/js/stickysidebar.js"></script> |
| |
| <!-- Google Analytics --> |
| <script> |
| (function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){ |
| (i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o), |
| m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m) |
| })(window,document,'script','//www.google-analytics.com/analytics.js','ga'); |
| |
| ga('create', 'UA-52545728-1', 'auto'); |
| ga('send', 'pageview'); |
| </script> |
| </body> |
| </html> |