blob: d24eac6101aab9c817c9c7bc421e96dab68b2239 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<!-- The above 3 meta tags *must* come first in the head; any other head content must come *after* these tags -->
<title>Apache Flink: Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink</title>
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<!-- Bootstrap -->
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.4.1/css/bootstrap.min.css">
<link rel="stylesheet" href="/css/flink.css">
<link rel="stylesheet" href="/css/syntax.css">
<!-- Blog RSS feed -->
<link href="/blog/feed.xml" rel="alternate" type="application/rss+xml" title="Apache Flink Blog: RSS feed" />
<!-- jQuery (necessary for Bootstrap's JavaScript plugins) -->
<!-- We need to load Jquery in the header for custom google analytics event tracking-->
<script src="/js/jquery.min.js"></script>
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!-- WARNING: Respond.js doesn't work if you view the page via file:// -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<!-- Main content. -->
<div class="container">
<div class="row">
<div id="sidebar" class="col-sm-3">
<!-- Top navbar. -->
<nav class="navbar navbar-default">
<!-- The logo. -->
<div class="navbar-header">
<button type="button" class="navbar-toggle collapsed" data-toggle="collapse" data-target="#bs-example-navbar-collapse-1">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
<div class="navbar-logo">
<a href="/">
<img alt="Apache Flink" src="/img/flink-header-logo.svg" width="147px" height="73px">
</a>
</div>
</div><!-- /.navbar-header -->
<!-- The navigation links. -->
<div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
<ul class="nav navbar-nav navbar-main">
<!-- First menu section explains visitors what Flink is -->
<!-- What is Stream Processing? -->
<!--
<li><a href="/streamprocessing1.html">What is Stream Processing?</a></li>
-->
<!-- What is Flink? -->
<li><a href="/flink-architecture.html">What is Apache Flink?</a></li>
<!-- What is Stateful Functions? -->
<li><a href="/stateful-functions.html">What is Stateful Functions?</a></li>
<!-- Use cases -->
<li><a href="/usecases.html">Use Cases</a></li>
<!-- Powered by -->
<li><a href="/poweredby.html">Powered By</a></li>
&nbsp;
<!-- Second menu section aims to support Flink users -->
<!-- Downloads -->
<li><a href="/downloads.html">Downloads</a></li>
<!-- Getting Started -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Getting Started<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11/getting-started/index.html" target="_blank">With Flink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/getting-started/project-setup.html" target="_blank">With Flink Stateful Functions <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="/training.html">Training Course</a></li>
</ul>
</li>
<!-- Documentation -->
<li class="dropdown">
<a class="dropdown-toggle" data-toggle="dropdown" href="#">Documentation<span class="caret"></span></a>
<ul class="dropdown-menu">
<li><a href="https://ci.apache.org/projects/flink/flink-docs-release-1.11" target="_blank">Flink 1.11 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-docs-master" target="_blank">Flink Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1" target="_blank">Flink Stateful Functions 2.1 (Latest stable release) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li><a href="https://ci.apache.org/projects/flink/flink-statefun-docs-master" target="_blank">Flink Stateful Functions Master (Latest Snapshot) <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
</ul>
</li>
<!-- getting help -->
<li><a href="/gettinghelp.html">Getting Help</a></li>
<!-- Blog -->
<li class="active"><a href="/blog/"><b>Flink Blog</b></a></li>
<!-- Flink-packages -->
<li>
<a href="https://flink-packages.org" target="_blank">flink-packages.org <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Third menu section aim to support community and contributors -->
<!-- Community -->
<li><a href="/community.html">Community &amp; Project Info</a></li>
<!-- Roadmap -->
<li><a href="/roadmap.html">Roadmap</a></li>
<!-- Contribute -->
<li><a href="/contributing/how-to-contribute.html">How to Contribute</a></li>
<!-- GitHub -->
<li>
<a href="https://github.com/apache/flink" target="_blank">Flink on GitHub <small><span class="glyphicon glyphicon-new-window"></span></small></a>
</li>
&nbsp;
<!-- Language Switcher -->
<li>
<!-- link to the Chinese home page when current is blog page -->
<a href="/zh">中文版</a>
</li>
</ul>
<ul class="nav navbar-nav navbar-bottom">
<hr />
<!-- Twitter -->
<li><a href="https://twitter.com/apacheflink" target="_blank">@ApacheFlink <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<!-- Visualizer -->
<li class=" hidden-md hidden-sm"><a href="/visualizer/" target="_blank">Plan Visualizer <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<hr />
<li><a href="https://apache.org" target="_blank">Apache Software Foundation <small><span class="glyphicon glyphicon-new-window"></span></small></a></li>
<li>
<style>
.smalllinks:link {
display: inline-block !important; background: none; padding-top: 0px; padding-bottom: 0px; padding-right: 0px; min-width: 75px;
}
</style>
<a class="smalllinks" href="https://www.apache.org/licenses/" target="_blank">License</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/security/" target="_blank">Security</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/sponsorship.html" target="_blank">Donate</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
<a class="smalllinks" href="https://www.apache.org/foundation/thanks.html" target="_blank">Thanks</a> <small><span class="glyphicon glyphicon-new-window"></span></small>
</li>
</ul>
</div><!-- /.navbar-collapse -->
</nav>
</div>
<div class="col-sm-9">
<div class="row-fluid">
<div class="col-sm-12">
<div class="row">
<h1>Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink</h1>
<p><i></i></p>
<article>
<p>11 Dec 2015 by Matthias J. Sax (<a href="https://twitter.com/MatthiasJSax">@MatthiasJSax</a>)</p>
<p><a href="https://storm.apache.org">Apache Storm</a> was one of the first distributed and scalable stream processing systems available in the open source space offering (near) real-time tuple-by-tuple processing semantics.
Initially released by the developers at Backtype in 2011 under the Eclipse open-source license, it became popular very quickly.
Only shortly afterwards, Twitter acquired Backtype.
Since then, Storm has been growing in popularity, is used in production at many big companies, and is the de-facto industry standard for big data stream processing.
In 2013, Storm entered the Apache incubator program, followed by its graduation to top-level in 2014.</p>
<p>Apache Flink is a stream processing engine that improves upon older technologies like Storm in several dimensions,
including <a href="https://ci.apache.org/projects/flink/flink-docs-master/internals/stream_checkpointing.html">strong consistency guarantees</a> (“exactly once”),
a higher level <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html">DataStream API</a>,
support for <a href="http://flink.apache.org/news/2015/12/04/Introducing-windows.html">event time and a rich windowing system</a>,
as well as <a href="https://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/">superior throughput with competitive low latency</a>.</p>
<p>While Flink offers several technical benefits over Storm, an existing investment on a codebase of applications developed for Storm often makes it difficult to switch engines.
For these reasons, as part of the Flink 0.10 release, Flink ships with a Storm compatibility package that allows users to:</p>
<ul>
<li>Run <strong>unmodified</strong> Storm topologies using Apache Flink benefiting from superior performance.</li>
<li><strong>Embed</strong> Storm code (spouts and bolts) as operators inside Flink DataStream programs.</li>
</ul>
<p>Only minor code changes are required in order to submit the program to Flink instead of Storm.
This minimizes the work for developers to run existing Storm topologies while leveraging Apache Flink’s fast and robust execution engine.</p>
<p>We note that the Storm compatibility package is continuously improving and does not cover the full spectrum of Storm’s API.
However, it is powerful enough to cover many use cases.</p>
<h2 id="executing-storm-topologies-with-flink">Executing Storm topologies with Flink</h2>
<center>
<img src="/img/blog/flink-storm.png" style="height:200px;margin:15px" />
</center>
<p>The easiest way to use the Storm compatibility package is by executing a whole Storm topology in Flink.
For this, you only need to replace the dependency <code>storm-core</code> by <code>flink-storm</code> in your Storm project and <strong>change two lines of code</strong> in your original Storm program.</p>
<p>The following example shows a simple Storm-Word-Count-Program that can be executed in Flink.
First, the program is assembled the Storm way without any code change to Spouts, Bolts, or the topology itself.</p>
<div class="highlight"><pre><code class="language-java"><span class="c1">// assemble topology, the Storm way</span>
<span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">TopologyBuilder</span><span class="o">();</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">&quot;source&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StormFileSpout</span><span class="o">(</span><span class="n">inputFilePath</span><span class="o">));</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;tokenizer&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StormBoltTokenizer</span><span class="o">())</span>
<span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">&quot;source&quot;</span><span class="o">);</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;counter&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StormBoltCounter</span><span class="o">())</span>
<span class="o">.</span><span class="na">fieldsGrouping</span><span class="o">(</span><span class="s">&quot;tokenizer&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">&quot;word&quot;</span><span class="o">));</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">&quot;sink&quot;</span><span class="o">,</span> <span class="k">new</span> <span class="nf">StormBoltFileSink</span><span class="o">(</span><span class="n">outputFilePath</span><span class="o">))</span>
<span class="o">.</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">&quot;counter&quot;</span><span class="o">);</span></code></pre></div>
<p>In order to execute the topology, we need to translate it to a <code>FlinkTopology</code> and submit it to a local or remote Flink cluster, very similar to submitting the application to a Storm cluster.<sup><a href="#fn1" id="ref1">1</a></sup></p>
<div class="highlight"><pre><code class="language-java"><span class="c1">// transform Storm topology to Flink program</span>
<span class="c1">// replaces: StormTopology topology = builder.createTopology();</span>
<span class="n">FlinkTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="n">FlinkTopology</span><span class="o">.</span><span class="na">createTopology</span><span class="o">(</span><span class="n">builder</span><span class="o">);</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Config</span><span class="o">();</span>
<span class="k">if</span><span class="o">(</span><span class="n">runLocal</span><span class="o">)</span> <span class="o">{</span>
<span class="c1">// use FlinkLocalCluster instead of LocalCluster</span>
<span class="n">FlinkLocalCluster</span> <span class="n">cluster</span> <span class="o">=</span> <span class="n">FlinkLocalCluster</span><span class="o">.</span><span class="na">getLocalCluster</span><span class="o">();</span>
<span class="n">cluster</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">&quot;WordCount&quot;</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">);</span>
<span class="o">}</span> <span class="k">else</span> <span class="o">{</span>
<span class="c1">// use FlinkSubmitter instead of StormSubmitter</span>
<span class="n">FlinkSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">&quot;WordCount&quot;</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">);</span>
<span class="o">}</span></code></pre></div>
<p>As a shorter Flink-style alternative that replaces the Storm-style submission code, you can also use context-based job execution:</p>
<div class="highlight"><pre><code class="language-java"><span class="c1">// transform Storm topology to Flink program (as above)</span>
<span class="n">FlinkTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="n">FlinkTopology</span><span class="o">.</span><span class="na">createTopology</span><span class="o">(</span><span class="n">builder</span><span class="o">);</span>
<span class="c1">// executes locally by default or remotely if submitted with Flink&#39;s command-line client</span>
<span class="n">topology</span><span class="o">.</span><span class="na">execute</span><span class="o">()</span></code></pre></div>
<p>After the code is packaged in a jar file (e.g., <code>StormWordCount.jar</code>), it can be easily submitted to Flink via</p>
<div class="highlight"><pre><code>bin/flink run StormWordCount.jar
</code></pre></div>
<p>The used Spouts and Bolts as well as the topology assemble code is not changed at all!
Only the translation and submission step have to be changed to the Storm-API compatible Flink pendants.
This allows for minimal code changes and easy adaption to Flink.</p>
<h3 id="embedding-spouts-and-bolts-in-flink-programs">Embedding Spouts and Bolts in Flink programs</h3>
<p>It is also possible to use Spouts and Bolts within a regular Flink DataStream program.
The compatibility package provides wrapper classes for Spouts and Bolts which are implemented as a Flink <code>SourceFunction</code> and <code>StreamOperator</code> respectively.
Those wrappers automatically translate incoming Flink POJO and <code>TupleXX</code> records into Storm’s <code>Tuple</code> type and emitted <code>Values</code> back into either POJOs or <code>TupleXX</code> types for further processing by Flink operators.
As Storm is type agnostic, it is required to specify the output type of embedded Spouts/Bolts manually to get a fully typed Flink streaming program.</p>
<div class="highlight"><pre><code class="language-java"><span class="c1">// use regular Flink streaming environment</span>
<span class="n">StreamExecutionEnvironment</span> <span class="n">env</span> <span class="o">=</span> <span class="n">StreamExecutionEnvironment</span><span class="o">.</span><span class="na">getExecutionEnvironment</span><span class="o">();</span>
<span class="c1">// use Spout as source</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">source</span> <span class="o">=</span>
<span class="n">env</span><span class="o">.</span><span class="na">addSource</span><span class="o">(</span><span class="c1">// Flink provided wrapper including original Spout</span>
<span class="k">new</span> <span class="n">SpoutWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="k">new</span> <span class="nf">FileSpout</span><span class="o">(</span><span class="n">localFilePath</span><span class="o">)),</span>
<span class="c1">// specify output type manually</span>
<span class="n">TypeExtractor</span><span class="o">.</span><span class="na">getForObject</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;(</span><span class="s">&quot;&quot;</span><span class="o">)));</span>
<span class="c1">// FileSpout cannot be parallelized</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span> <span class="n">text</span> <span class="o">=</span> <span class="n">source</span><span class="o">.</span><span class="na">setParallelism</span><span class="o">(</span><span class="mi">1</span><span class="o">);</span>
<span class="c1">// further processing with Flink</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">tokens</span> <span class="o">=</span> <span class="n">text</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span><span class="k">new</span> <span class="nf">Tokenizer</span><span class="o">()).</span><span class="na">keyBy</span><span class="o">(</span><span class="mi">0</span><span class="o">);</span>
<span class="c1">// use Bolt for counting</span>
<span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;</span> <span class="n">counts</span> <span class="o">=</span>
<span class="n">tokens</span><span class="o">.</span><span class="na">transform</span><span class="o">(</span><span class="s">&quot;Counter&quot;</span><span class="o">,</span>
<span class="c1">// specify output type manually</span>
<span class="n">TypeExtractor</span><span class="o">.</span><span class="na">getForObject</span><span class="o">(</span><span class="k">new</span> <span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;(</span><span class="s">&quot;&quot;</span><span class="o">,</span><span class="mi">0</span><span class="o">))</span>
<span class="c1">// Flink provided wrapper including original Bolt</span>
<span class="k">new</span> <span class="n">BoltWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span><span class="n">Integer</span><span class="o">&gt;&gt;(</span><span class="k">new</span> <span class="nf">BoltCounter</span><span class="o">()));</span>
<span class="c1">// write result to file via Flink sink</span>
<span class="n">counts</span><span class="o">.</span><span class="na">writeAsText</span><span class="o">(</span><span class="n">outputPath</span><span class="o">);</span>
<span class="c1">// start Flink job</span>
<span class="n">env</span><span class="o">.</span><span class="na">execute</span><span class="o">(</span><span class="s">&quot;WordCount with Spout source and Bolt counter&quot;</span><span class="o">);</span></code></pre></div>
<p>Although some boilerplate code is needed (we plan to address this soon!), the actual embedded Spout and Bolt code can be used unmodified.
We also note that the resulting program is fully typed, and type errors will be found by Flink’s type extractor even if the original Spouts and Bolts are not.</p>
<h2 id="outlook">Outlook</h2>
<p>The Storm compatibility package is currently in beta and undergoes continuous development.
We are currently working on providing consistency guarantees for stateful Bolts.
Furthermore, we want to provide a better API integration for embedded Spouts and Bolts by providing a “StormExecutionEnvironment” as a special extension of Flink’s <code>StreamExecutionEnvironment</code>.
We are also investigating the integration of Storm’s higher-level programming API Trident.</p>
<h2 id="summary">Summary</h2>
<p>Flink’s compatibility package for Storm allows using unmodified Spouts and Bolts within Flink.
This enables you to even embed third-party Spouts and Bolts where the source code is not available.
While you can embed Spouts/Bolts in a Flink program and mix-and-match them with Flink operators, running whole topologies is the easiest way to get started and can be achieved with almost no code changes.</p>
<p>If you want to try out Flink’s Storm compatibility package checkout our <a href="https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/storm_compatibility.html">Documentation</a>.</p>
<hr />
<p><sup id="fn1">1. We confess, there are three lines changed compared to a Storm project <img class="emoji" style="width:16px;height:16px;align:absmiddle" src="/img/blog/smirk.png" />—because the example covers local <em>and</em> remote execution. <a href="#ref1" title="Back to text."></a></sup></p>
</article>
</div>
<div class="row">
<div id="disqus_thread"></div>
<script type="text/javascript">
/* * * CONFIGURATION VARIABLES: EDIT BEFORE PASTING INTO YOUR WEBPAGE * * */
var disqus_shortname = 'stratosphere-eu'; // required: replace example with your forum shortname
/* * * DON'T EDIT BELOW THIS LINE * * */
(function() {
var dsq = document.createElement('script'); dsq.type = 'text/javascript'; dsq.async = true;
dsq.src = '//' + disqus_shortname + '.disqus.com/embed.js';
(document.getElementsByTagName('head')[0] || document.getElementsByTagName('body')[0]).appendChild(dsq);
})();
</script>
</div>
</div>
</div>
</div>
</div>
<hr />
<div class="row">
<div class="footer text-center col-sm-12">
<p>Copyright © 2014-2019 <a href="http://apache.org">The Apache Software Foundation</a>. All Rights Reserved.</p>
<p>Apache Flink, Flink®, Apache®, the squirrel logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation.</p>
<p><a href="/privacy-policy.html">Privacy Policy</a> &middot; <a href="/blog/feed.xml">RSS feed</a></p>
</div>
</div>
</div><!-- /.container -->
<!-- Include all compiled plugins (below), or include individual files as needed -->
<script src="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.4/js/bootstrap.min.js"></script>
<script src="https://cdnjs.cloudflare.com/ajax/libs/jquery.matchHeight/0.7.0/jquery.matchHeight-min.js"></script>
<script src="/js/codetabs.js"></script>
<script src="/js/stickysidebar.js"></script>
<!-- Google Analytics -->
<script>
(function(i,s,o,g,r,a,m){i['GoogleAnalyticsObject']=r;i[r]=i[r]||function(){
(i[r].q=i[r].q||[]).push(arguments)},i[r].l=1*new Date();a=s.createElement(o),
m=s.getElementsByTagName(o)[0];a.async=1;a.src=g;m.parentNode.insertBefore(a,m)
})(window,document,'script','//www.google-analytics.com/analytics.js','ga');
ga('create', 'UA-52545728-1', 'auto');
ga('send', 'pageview');
</script>
</body>
</html>