blob: 9668897fdec587928d67a6de3333ab0eddb1fdd2 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink: A Practical Guide to Broadcast State in Apache Flink</title>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/flink.css">
<link rel="stylesheet" href="/css/syntax.css">
<!-- Blog RSS feed -->
<link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<!-- We need to load Jquery in the header for custom google analytics event tracking-->
<script src="/js/jquery.min.js"></script>
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Main content. -->
<div class="container">
<div class="row">
<div id="sidebar" class="col-sm-3">
<!-- Top navbar. -->
<nav class="navbar navbar-default">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="/">
<img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
</a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-main">
<!-- First menu section explains visitors what Flink is -->
<!-- What is Stream Processing? -->
<!--
<li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
-->
<!-- What is Flink? -->
<li><a href="/flink-architecture.html">What is Apache Flink?</a></li>
<!-- What is Stateful Functions? -->
<li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>
<!-- Use cases -->
<li><a href="/usecases.html">Use Cases</a></li>
<!-- Powered by -->
<li><a href="/poweredby.html">Powered By</a></li>
&nbsp;
<!-- Second menu section aims to support Flink users -->
<!-- Downloads -->
<li><a href="/downloads.html">Downloads</a></li>
<!-- Getting Started -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="/training.html">Training Course</a></li>
</ul>
</li>
<!-- Documentation -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11" target="_blank">Flink 1.11 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
</ul>
</li>
<!-- getting help -->
<li><a href="/gettinghelp.html">Getting Help</a></li>
<!-- Blog -->
<li><a href="/blog/"><b>Flink Blog</b></a></li>
<!-- Flink-packages -->
<li>
<a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Third menu section aim to support community and contributors -->
<!-- Community -->
<li><a href="/community.html">Community &amp; Project Info</a></li>
<!-- Roadmap -->
<li><a href="/roadmap.html">Roadmap</a></li>
<!-- Contribute -->
<li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
<!-- GitHub -->
<li>
<a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Language Switcher -->
<li>
<a href="/zh/2019/06/26/broadcast-state.html">中文版</a>
</li>
</ul>
<ul class="nav navbar-nav navbar-bottom">
<hr />
<!-- Twitter -->
<li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<!-- Visualizer -->
<li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<hr />
<li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li>
<style>
.smalllinks:link {
display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
}
</style>
<a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
</li>
</ul>
</div><!-- /.navbar-collapse -->
</nav>
</div>
<div class="col-sm-9">
<div class="row-fluid">
<div class="col-sm-12">
<div class="row">
<h1>A Practical Guide to Broadcast State in Apache Flink</h1>
<p><i></i></p>
<article>
<p>26 Jun 2019 Fabian Hueske (<a href="https://twitter.com/fhueske">@fhueske</a>)</p>
<p>Since version 1.5.0, Apache Flink features a new type of state which is called Broadcast State. In this post, we explain what Broadcast State is, and show an example of how it can be applied to an application that evaluates dynamic patterns on an event stream. We walk you through the processing steps and the source code to implement this application in practice.</p>
<h2 id="what-is-broadcast-state">What is Broadcast State?</h2>
<p>The Broadcast State can be used to combine and jointly process two streams of events in a specific way. The events of the first stream are broadcasted to all parallel instances of an operator, which maintains them as state. The events of the other stream are not broadcasted but sent to individual instances of the same operator and processed together with the events of the broadcasted stream.
The new broadcast state is a natural fit for applications that need to join a low-throughput and a high-throughput stream or need to dynamically update their processing logic. We will use a concrete example of the latter use case to explain the broadcast state and show its API in more detail in the remainder of this post.</p>
<h2 id="dynamic-pattern-evaluation-with-broadcast-state">Dynamic Pattern Evaluation with Broadcast State</h2>
<p>Imagine an e-commerce website that captures the interactions of all users as a stream of user actions. The company that operates the website is interested in analyzing the interactions to increase revenue, improve the user experience, and detect and prevent malicious behavior.
The website implements a streaming application that detects a pattern on the stream of user events. However, the company wants to avoid modifying and redeploying the application every time the pattern changes. Instead, the application ingests a second stream of patterns and updates its active pattern when it receives a new pattern from the pattern stream. In the following, we discuss this application step-by-step and show how it leverages the broadcast state feature in Apache Flink.</p>
<center>
<img src="/img/blog/broadcastState/fig1.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>Our example application ingests two data streams. The first stream provides user actions on the website and is illustrated on the top left side of the above figure. A user interaction event consists of the type of the action (user login, user logout, add to cart, or complete payment) and the id of the user, which is encoded by color. The user action event stream in our illustration contains a logout action of User 1001 followed by a payment-complete event for User 1003, and an “add-to-cart” action of User 1002.</p>
<p>The second stream provides action patterns that the application will evaluate. A pattern consists of two consecutive actions. In the figure above, the pattern stream contains the following two:</p>
<ul>
<li>Pattern #1: A user logs in and immediately logs out without browsing additional pages on the e-commerce website.</li>
<li>Pattern #2: A user adds an item to the shopping cart and logs out without completing the purchase.</li>
</ul>
<p>Such patterns help a business in better analyzing user behavior, detecting malicious actions, and improving the website experience. For example, in the case of items being added to a shopping cart with no follow up purchase, the website team can take appropriate actions to understand better the reasons why users don’t complete a purchase and initiate specific programs to improve the website conversion (such as providing discount codes, limited free shipping offers etc.)</p>
<p>On the right-hand side, the figure shows three parallel tasks of an operator that ingest the pattern and user action streams, evaluate the patterns on the action stream, and emit pattern matches downstream. For the sake of simplicity, the operator in our example only evaluates a single pattern with exactly two subsequent actions. The currently active pattern is replaced when a new pattern is received from the pattern stream. In principle, the operator could also be implemented to evaluate more complex patterns or multiple patterns concurrently which could be individually added or removed.</p>
<p>We will describe how the pattern matching application processes the user action and pattern streams.</p>
<center>
<img src="/img/blog/broadcastState/fig2.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>First a pattern is sent to the operator. The pattern is broadcasted to all three parallel tasks of the operator. The tasks store the pattern in their broadcast state. Since the broadcast state should only be updated using broadcasted data, the state of all tasks is always expected to be the same.</p>
<center>
<img src="/img/blog/broadcastState/fig3.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>Next, the first user actions are partitioned on the user id and shipped to the operator tasks. The partitioning ensures that all actions of the same user are processed by the same task. The figure above shows the state of the application after the first pattern and the first three action events were consumed by the operator tasks.</p>
<p>When a task receives a new user action, it evaluates the currently active pattern by looking at the user’s latest and previous actions. For each user, the operator stores the previous action in the keyed state. Since the tasks in the figure above only received a single action for each user so far (we just started the application), the pattern does not need to be evaluated. Finally, the previous action in the user’s keyed state is updated to the latest action, to be able to look it up when the next action of the same user arrives.</p>
<center>
<img src="/img/blog/broadcastState/fig4.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>After the first three actions are processed, the next event, the logout action of User 1001, is shipped to the task that processes the events of User 1001. When the task receives the actions, it looks up the current pattern from the broadcast state and the previous action of User 1001. Since the pattern matches both actions, the task emits a pattern match event. Finally, the task updates its keyed state by overriding the previous event with the latest action.</p>
<center>
<img src="/img/blog/broadcastState/fig5.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>When a new pattern arrives in the pattern stream, it is broadcasted to all tasks and each task updates its broadcast state by replacing the current pattern with the new one.</p>
<center>
<img src="/img/blog/broadcastState/fig6.png" width="600px" alt="Broadcast State in Apache Flink." />
</center>
<p><br /></p>
<p>Once the broadcast state is updated with a new pattern, the matching logic continues as before, i.e., user action events are partitioned by key and evaluated by the responsible task.</p>
<h2 id="how-to-implement-an-application-with-broadcast-state">How to Implement an Application with Broadcast State?</h2>
<p>Until now, we conceptually discussed the application and explained how it uses broadcast state to evaluate dynamic patterns over event streams. Next, we’ll show how to implement the example application with Flink’s DataStream API and the broadcast state feature.</p>
<p>Let’s start with the input data of the application. We have two data streams, actions, and patterns. At this point, we don’t really care where the streams come from. The streams could be ingested from Apache Kafka or Kinesis or any other system. Action and Pattern are Pojos with two fields each:</p>
<div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Action</span><span class="o">&gt;</span> <span class="n">actions</span> <span class="o">=</span> <span class="o">???</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Pattern</span><span class="o">&gt;</span> <span class="n">patterns</span> <span class="o">=</span> <span class="o">???</span></code></pre></div>
<p><code>Action</code> and <code>Pattern</code> are Pojos with two fields each:</p>
<ul>
<li>
<p><code>Action: Long userId, String action</code></p>
</li>
<li>
<p><code>Pattern: String firstAction, String secondAction</code></p>
</li>
</ul>
<p>As a first step, we key the action stream on the <code>userId</code> attribute.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">KeyedStream</span><span class="o">&lt;</span><span class="n">Action</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">actionsByUser</span> <span class="o">=</span> <span class="n">actions</span>
<span class="o">.</span><span class="na">keyBy</span><span class="o">((</span><span class="n">KeySelector</span><span class="o">&lt;</span><span class="n">Action</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;)</span> <span class="n">action</span> <span class="o">-&gt;</span> <span class="n">action</span><span class="o">.</span><span class="na">userId</span><span class="o">);</span></code></pre></div>
<p>Next, we prepare the broadcast state. Broadcast state is always represented as <code>MapState</code>, the most versatile state primitive that Flink provides.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">MapStateDescriptor</span><span class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;</span> <span class="n">bcStateDescriptor</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">MapStateDescriptor</span><span class="o">&lt;&gt;(</span><span class="s">&quot;patterns&quot;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">VOID</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">POJO</span><span class="o">(</span><span class="n">Pattern</span><span class="o">.</span><span class="na">class</span><span class="o">));</span></code></pre></div>
<p>Since our application only evaluates and stores a single <code>Pattern</code> at a time, we configure the broadcast state as a <code>MapState</code> with key type <code>Void</code> and value type <code>Pattern</code>. The <code>Pattern</code> is always stored in the <code>MapState</code> with <code>null</code> as key.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">BroadcastStream</span><span class="o">&lt;</span><span class="n">Pattern</span><span class="o">&gt;</span> <span class="n">bcedPatterns</span> <span class="o">=</span> <span class="n">patterns</span><span class="o">.</span><span class="na">broadcast</span><span class="o">(</span><span class="n">bcStateDescriptor</span><span class="o">);</span></code></pre></div>
<p>Using the <code>MapStateDescriptor</code> for the broadcast state, we apply the <code>broadcast()</code> transformation on the patterns stream and receive a <code>BroadcastStream bcedPatterns</code>.</p>
<div class="highlight"><pre><code class="language-java"><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;&gt;</span> <span class="n">matches</span> <span class="o">=</span> <span class="n">actionsByUser</span>
<span class="o">.</span><span class="na">connect</span><span class="o">(</span><span class="n">bcedPatterns</span><span class="o">)</span>
<span class="o">.</span><span class="na">process</span><span class="o">(</span><span class="k">new</span> <span class="nf">PatternEvaluator</span><span class="o">());</span></code></pre></div>
<p>After we obtained the keyed <code>actionsByUser</code> stream and the broadcasted <code>bcedPatterns</code> stream, we <code>connect()</code> both streams and apply a <code>PatternEvaluator</code> on the connected streams. <code>PatternEvaluator</code> is a custom function that implements the <code>KeyedBroadcastProcessFunction</code> interface. It applies the pattern matching logic that we discussed before and emits <code>Tuple2&lt;Long, Pattern&gt;</code> records which contain the user id and the matched pattern.</p>
<div class="highlight"><pre><code class="language-java"><span class="kd">public</span> <span class="kd">static</span> <span class="kd">class</span> <span class="nc">PatternEvaluator</span>
<span class="kd">extends</span> <span class="n">KeyedBroadcastProcessFunction</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Action</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">,</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;&gt;</span> <span class="o">{</span>
<span class="c1">// handle for keyed state (per user)</span>
<span class="n">ValueState</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span> <span class="n">prevActionState</span><span class="o">;</span>
<span class="c1">// broadcast state descriptor</span>
<span class="n">MapStateDescriptor</span><span class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;</span> <span class="n">patternDesc</span><span class="o">;</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">open</span><span class="o">(</span><span class="n">Configuration</span> <span class="n">conf</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// initialize keyed state</span>
<span class="n">prevActionState</span> <span class="o">=</span> <span class="n">getRuntimeContext</span><span class="o">().</span><span class="na">getState</span><span class="o">(</span>
<span class="k">new</span> <span class="n">ValueStateDescriptor</span><span class="o">&lt;&gt;(</span><span class="s">&quot;lastAction&quot;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">STRING</span><span class="o">));</span>
<span class="n">patternDesc</span> <span class="o">=</span>
<span class="k">new</span> <span class="n">MapStateDescriptor</span><span class="o">&lt;&gt;(</span><span class="s">&quot;patterns&quot;</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">VOID</span><span class="o">,</span> <span class="n">Types</span><span class="o">.</span><span class="na">POJO</span><span class="o">(</span><span class="n">Pattern</span><span class="o">.</span><span class="na">class</span><span class="o">));</span>
<span class="o">}</span>
<span class="cm">/**</span>
<span class="cm"> * Called for each user action.</span>
<span class="cm"> * Evaluates the current pattern against the previous and</span>
<span class="cm"> * current action of the user.</span>
<span class="cm"> */</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">processElement</span><span class="o">(</span>
<span class="n">Action</span> <span class="n">action</span><span class="o">,</span>
<span class="n">ReadOnlyContext</span> <span class="n">ctx</span><span class="o">,</span>
<span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="c1">// get current pattern from broadcast state</span>
<span class="n">Pattern</span> <span class="n">pattern</span> <span class="o">=</span> <span class="n">ctx</span>
<span class="o">.</span><span class="na">getBroadcastState</span><span class="o">(</span><span class="k">this</span><span class="o">.</span><span class="na">patternDesc</span><span class="o">)</span>
<span class="c1">// access MapState with null as VOID default value</span>
<span class="o">.</span><span class="na">get</span><span class="o">(</span><span class="kc">null</span><span class="o">);</span>
<span class="c1">// get previous action of current user from keyed state</span>
<span class="n">String</span> <span class="n">prevAction</span> <span class="o">=</span> <span class="n">prevActionState</span><span class="o">.</span><span class="na">value</span><span class="o">();</span>
<span class="k">if</span> <span class="o">(</span><span class="n">pattern</span> <span class="o">!=</span> <span class="kc">null</span> <span class="o">&amp;&amp;</span> <span class="n">prevAction</span> <span class="o">!=</span> <span class="kc">null</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// user had an action before, check if pattern matches</span>
<span class="k">if</span> <span class="o">(</span><span class="n">pattern</span><span class="o">.</span><span class="na">firstAction</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">prevAction</span><span class="o">)</span> <span class="o">&amp;&amp;</span>
<span class="n">pattern</span><span class="o">.</span><span class="na">secondAction</span><span class="o">.</span><span class="na">equals</span><span class="o">(</span><span class="n">action</span><span class="o">.</span><span class="na">action</span><span class="o">))</span> <span class="o">{</span>
<span class="c1">// MATCH</span>
<span class="n">out</span><span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;&gt;(</span><span class="n">ctx</span><span class="o">.</span><span class="na">getCurrentKey</span><span class="o">(),</span> <span class="n">pattern</span><span class="o">));</span>
<span class="o">}</span>
<span class="o">}</span>
<span class="c1">// update keyed state and remember action for next pattern evaluation</span>
<span class="n">prevActionState</span><span class="o">.</span><span class="na">update</span><span class="o">(</span><span class="n">action</span><span class="o">.</span><span class="na">action</span><span class="o">);</span>
<span class="o">}</span>
<span class="cm">/**</span>
<span class="cm"> * Called for each new pattern.</span>
<span class="cm"> * Overwrites the current pattern with the new pattern.</span>
<span class="cm"> */</span>
<span class="nd">@Override</span>
<span class="kd">public</span> <span class="kt">void</span> <span class="nf">processBroadcastElement</span><span class="o">(</span>
<span class="n">Pattern</span> <span class="n">pattern</span><span class="o">,</span>
<span class="n">Context</span> <span class="n">ctx</span><span class="o">,</span>
<span class="n">Collector</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;&gt;</span> <span class="n">out</span><span class="o">)</span> <span class="kd">throws</span> <span class="n">Exception</span> <span class="o">{</span>
<span class="c1">// store the new pattern by updating the broadcast state</span>
<span class="n">BroadcastState</span><span class="o">&lt;</span><span class="n">Void</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">&gt;</span> <span class="n">bcState</span> <span class="o">=</span> <span class="n">ctx</span><span class="o">.</span><span class="na">getBroadcastState</span><span class="o">(</span><span class="n">patternDesc</span><span class="o">);</span>
<span class="c1">// storing in MapState with null as VOID default value</span>
<span class="n">bcState</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="kc">null</span><span class="o">,</span> <span class="n">pattern</span><span class="o">);</span>
<span class="o">}</span>
<span class="o">}</span></code></pre></div>
<p>The <code>KeyedBroadcastProcessFunction</code> interface provides three methods to process records and emit results.</p>
<ul>
<li><code>processBroadcastElement()</code> is called for each record of the broadcasted stream. In our <code>PatternEvaluator</code> function, we simply put the received <code>Pattern</code> record in to the broadcast state using the <code>null</code> key (remember, we only store a single pattern in the <code>MapState</code>).</li>
<li><code>processElement()</code> is called for each record of the keyed stream. It provides read-only access to the broadcast state to prevent modification that result in different broadcast states across the parallel instances of the function. The <code>processElement()</code> method of the <code>PatternEvaluator</code> retrieves the current pattern from the broadcast state and the previous action of the user from the keyed state. If both are present, it checks whether the previous and current action match with the pattern and emits a pattern match record if that is the case. Finally, it updates the keyed state to the current user action.</li>
<li><code>onTimer()</code> is called when a previously registered timer fires. Timers can be registered in the <code>processElement</code> method and are used to perform computations or to clean up state in the future. We did not implement this method in our example to keep the code concise. However, it could be used to remove the last action of a user when the user was not active for a certain period of time to avoid growing state due to inactive users.</li>
</ul>
<p>You might have noticed the context objects of the <code>KeyedBroadcastProcessFunction</code>’s processing method. The context objects give access to additional functionality such as:</p>
<ul>
<li>The broadcast state (read-write or read-only, depending on the method),</li>
<li>A <code>TimerService</code>, which gives access to the record’s timestamp, the current watermark, and which can register timers,</li>
<li>The current key (only available in <code>processElement()</code>), and</li>
<li>A method to apply a function the keyed state of each registered key (only available in <code>processBroadcastElement()</code>)</li>
</ul>
<p>The <code>KeyedBroadcastProcessFunction</code> has full access to Flink state and time features just like any other ProcessFunction and hence can be used to implement sophisticated application logic. Broadcast state was designed to be a versatile feature that adapts to different scenarios and use cases. Although we only discussed a fairly simple and restricted application, you can use broadcast state in many ways to implement the requirements of your application.</p>
<h2 id="conclusion">Conclusion</h2>
<p>In this blog post, we walked you through an example application to explain what Apache Flink’s broadcast state is and how it can be used to evaluate dynamic patterns on event streams. We’ve also discussed the API and showed the source code of our example application.</p>
<p>We invite you to check the <a href="https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html">documentation</a> of this feature and provide feedback or suggestions for further improvements through our <a href="http://mail-archives.apache.org/mod_mbox/flink-community/">mailing list</a>.</p>
</article>
</div>
<div class="row">
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</div>
</div>
</div>
</div>
</div>
<hr />
<div class="row">
<div class="footer text-center col-sm-12">
<p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
<p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
<p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
</div>
</div>
</div><!-- /.container -->
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
<script src="/js/codetabs.js"></script>
<script src="/js/stickysidebar.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
</body>
</html>