blob: ff328e6cb6011caa8ee7b15e58f98b91d64649a6 [file] [log] [blame]
<!DOCTYPE html>
<html lang="en" dir=ZgotmplZ>
<head>
<link rel="stylesheet" href="/bootstrap/css/bootstrap.min.css">
<script src="/bootstrap/js/bootstrap.bundle.min.js"></script>
<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css">
<script src="/js/anchor.min.js"></script>
<script src="/js/flink.js"></script>
<link rel="canonical" href="https://flink.apache.org/2015/12/11/storm-compatibility-in-apache-flink-how-to-run-existing-storm-topologies-on-flink/">
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta name="description" content="Apache Storm was one of the first distributed and scalable stream processing systems available in the open source space offering (near) real-time tuple-by-tuple processing semantics. Initially released by the developers at Backtype in 2011 under the Eclipse open-source license, it became popular very quickly. Only shortly afterwards, Twitter acquired Backtype. Since then, Storm has been growing in popularity, is used in production at many big companies, and is the de-facto industry standard for big data stream processing.">
<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink" />
<meta property="og:description" content="Apache Storm was one of the first distributed and scalable stream processing systems available in the open source space offering (near) real-time tuple-by-tuple processing semantics. Initially released by the developers at Backtype in 2011 under the Eclipse open-source license, it became popular very quickly. Only shortly afterwards, Twitter acquired Backtype. Since then, Storm has been growing in popularity, is used in production at many big companies, and is the de-facto industry standard for big data stream processing." />
<meta property="og:type" content="article" />
<meta property="og:url" content="https://flink.apache.org/2015/12/11/storm-compatibility-in-apache-flink-how-to-run-existing-storm-topologies-on-flink/" /><meta property="article:section" content="posts" />
<meta property="article:published_time" content="2015-12-11T10:00:00+00:00" />
<meta property="article:modified_time" content="2015-12-11T10:00:00+00:00" />
<title>Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink | Apache Flink</title>
<link rel="manifest" href="/manifest.json">
<link rel="icon" href="/favicon.png" type="image/x-icon">
<link rel="stylesheet" href="/book.min.22eceb4d17baa9cdc0f57345edd6f215a40474022dfee39b63befb5fb3c596b5.css" integrity="sha256-IuzrTRe6qc3A9XNF7dbyFaQEdAIt/uObY777X7PFlrU=">
<script defer src="/en.search.min.2698f0d1b683dae4d6cb071668b310a55ebcf1c48d11410a015a51d90105b53e.js" integrity="sha256-Jpjw0baD2uTWywcWaLMQpV688cSNEUEKAVpR2QEFtT4="></script>
<!--
Made with Book Theme
https://github.com/alex-shpak/hugo-book
-->
<meta name="generator" content="Hugo 0.124.1">
<script>
var _paq = window._paq = window._paq || [];
_paq.push(['disableCookies']);
_paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]);
_paq.push(['trackPageView']);
_paq.push(['enableLinkTracking']);
(function() {
var u="//analytics.apache.org/";
_paq.push(['setTrackerUrl', u+'matomo.php']);
_paq.push(['setSiteId', '1']);
var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0];
g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s);
})();
</script>
</head>
<body dir=ZgotmplZ>
<header>
<nav class="navbar navbar-expand-xl">
<div class="container-fluid">
<a class="navbar-brand" href="/">
<img src="/img/logo/png/100/flink_squirrel_100_color.png" alt="Apache Flink" height="47" width="47" class="d-inline-block align-text-middle">
<span>Apache Flink</span>
</a>
<button class="navbar-toggler" type="button" data-bs-toggle="collapse" data-bs-target="#navbarSupportedContent" aria-controls="navbarSupportedContent" aria-expanded="false" aria-label="Toggle navigation">
<i class="fa fa-bars navbar-toggler-icon"></i>
</button>
<div class="collapse navbar-collapse" id="navbarSupportedContent">
<ul class="navbar-nav">
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">About</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/what-is-flink/flink-architecture/">Architecture</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-applications/">Applications</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/flink-operations/">Operations</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/use-cases/">Use Cases</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/powered-by/">Powered By</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/roadmap/">Roadmap</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/community/">Community & Project Info</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/security/">Security</a>
</li>
<li>
<a class="dropdown-item" href="/what-is-flink/special-thanks/">Special Thanks</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Getting Started</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/">With Flink<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/">With Flink Kubernetes Operator<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable/docs/get-started/introduction/">With Flink CDC<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/">With Flink ML<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html">With Flink Stateful Functions<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/">Training Course<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">Documentation</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-stable/">Flink 1.19 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-docs-master/">Flink Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/">Kubernetes Operator 1.8 (latest)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main">Kubernetes Operator Main (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-stable">CDC 3.0 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-cdc-docs-master">CDC Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-stable/">ML 2.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-ml-docs-master">ML Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/">Stateful Functions 3.3 (stable)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
<li>
<a class="dropdown-item" href="https://nightlies.apache.org/flink/flink-statefun-docs-master">Stateful Functions Master (snapshot)<i class="link fa fa-external-link title" aria-hidden="true"></i>
</a>
</li>
</ul>
</li>
<li class="nav-item dropdown">
<a class="nav-link dropdown-toggle" href="#" role="button" data-bs-toggle="dropdown" aria-expanded="false">How to Contribute</a>
<ul class="dropdown-menu">
<li>
<a class="dropdown-item" href="/how-to-contribute/overview/">Overview</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-code/">Contribute Code</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/reviewing-prs/">Review Pull Requests</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/code-style-and-quality-preamble/">Code Style and Quality Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/contribute-documentation/">Contribute Documentation</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/documentation-style-guide/">Documentation Style Guide</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/improve-website/">Contribute to the Website</a>
</li>
<li>
<a class="dropdown-item" href="/how-to-contribute/getting-help/">Getting Help</a>
</li>
</ul>
</li>
<li class="nav-item">
<a class="nav-link" href="/posts/">Flink Blog</a>
</li>
<li class="nav-item">
<a class="nav-link" href="/downloads/">Downloads</a>
</li>
</ul>
<div class="book-search">
<div class="book-search-spinner hidden">
<i class="fa fa-refresh fa-spin"></i>
</div>
<form class="search-bar d-flex" onsubmit="return false;"su>
<input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/">
<i class="fa fa-search search"></i>
<i class="fa fa-circle-o-notch fa-spin spinner"></i>
</form>
<div class="book-search-spinner hidden"></div>
<ul id="book-search-results"></ul>
</div>
</div>
</div>
</nav>
<div class="navbar-clearfix"></div>
</header>
<main class="flex">
<section class="container book-page">
<article class="markdown">
<h1>
<a href="/2015/12/11/storm-compatibility-in-apache-flink-how-to-run-existing-storm-topologies-on-flink/">Storm Compatibility in Apache Flink: How to run existing Storm topologies on Flink</a>
</h1>
December 11, 2015 -
<p><p><a href="https://storm.apache.org">Apache Storm</a> was one of the first distributed and scalable stream processing systems available in the open source space offering (near) real-time tuple-by-tuple processing semantics.
Initially released by the developers at Backtype in 2011 under the Eclipse open-source license, it became popular very quickly.
Only shortly afterwards, Twitter acquired Backtype.
Since then, Storm has been growing in popularity, is used in production at many big companies, and is the de-facto industry standard for big data stream processing.
In 2013, Storm entered the Apache incubator program, followed by its graduation to top-level in 2014.</p>
<p>Apache Flink is a stream processing engine that improves upon older technologies like Storm in several dimensions,
including <a href="//nightlies.apache.org/flink/flink-docs-master/internals/stream_checkpointing.html">strong consistency guarantees</a> (&ldquo;exactly once&rdquo;),
a higher level <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming_guide.html">DataStream API</a>,
support for <a href="http://flink.apache.org/news/2015/12/04/Introducing-windows.html">event time and a rich windowing system</a>,
as well as <a href="https://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/">superior throughput with competitive low latency</a>.</p>
<p>While Flink offers several technical benefits over Storm, an existing investment on a codebase of applications developed for Storm often makes it difficult to switch engines.
For these reasons, as part of the Flink 0.10 release, Flink ships with a Storm compatibility package that allows users to:</p>
<ul>
<li>Run <strong>unmodified</strong> Storm topologies using Apache Flink benefiting from superior performance.</li>
<li><strong>Embed</strong> Storm code (spouts and bolts) as operators inside Flink DataStream programs.</li>
</ul>
<p>Only minor code changes are required in order to submit the program to Flink instead of Storm.
This minimizes the work for developers to run existing Storm topologies while leveraging Apache Flink’s fast and robust execution engine.</p>
<p>We note that the Storm compatibility package is continuously improving and does not cover the full spectrum of Storm’s API.
However, it is powerful enough to cover many use cases.</p>
<h2 id="executing-storm-topologies-with-flink">
Executing Storm topologies with Flink
<a class="anchor" href="#executing-storm-topologies-with-flink">#</a>
</h2>
<center>
<img src="/img/blog/flink-storm.png" style="height:200px;margin:15px">
</center>
<p>The easiest way to use the Storm compatibility package is by executing a whole Storm topology in Flink.
For this, you only need to replace the dependency <code>storm-core</code> by <code>flink-storm</code> in your Storm project and <strong>change two lines of code</strong> in your original Storm program.</p>
<p>The following example shows a simple Storm-Word-Count-Program that can be executed in Flink.
First, the program is assembled the Storm way without any code change to Spouts, Bolts, or the topology itself.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// assemble topology, the Storm way</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">TopologyBuilder</span><span class="w"> </span><span class="n">builder</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">TopologyBuilder</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">builder</span><span class="p">.</span><span class="na">setSpout</span><span class="p">(</span><span class="s">&#34;source&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StormFileSpout</span><span class="p">(</span><span class="n">inputFilePath</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">builder</span><span class="p">.</span><span class="na">setBolt</span><span class="p">(</span><span class="s">&#34;tokenizer&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StormBoltTokenizer</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">shuffleGrouping</span><span class="p">(</span><span class="s">&#34;source&#34;</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">builder</span><span class="p">.</span><span class="na">setBolt</span><span class="p">(</span><span class="s">&#34;counter&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StormBoltCounter</span><span class="p">())</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">fieldsGrouping</span><span class="p">(</span><span class="s">&#34;tokenizer&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Fields</span><span class="p">(</span><span class="s">&#34;word&#34;</span><span class="p">));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">builder</span><span class="p">.</span><span class="na">setBolt</span><span class="p">(</span><span class="s">&#34;sink&#34;</span><span class="p">,</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">StormBoltFileSink</span><span class="p">(</span><span class="n">outputFilePath</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="p">.</span><span class="na">shuffleGrouping</span><span class="p">(</span><span class="s">&#34;counter&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>In order to execute the topology, we need to translate it to a <code>FlinkTopology</code> and submit it to a local or remote Flink cluster, very similar to submitting the application to a Storm cluster.<sup><a href="#fn1" id="ref1">1</a></sup></p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// transform Storm topology to Flink program</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// replaces: StormTopology topology = builder.createTopology();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkTopology</span><span class="w"> </span><span class="n">topology</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FlinkTopology</span><span class="p">.</span><span class="na">createTopology</span><span class="p">(</span><span class="n">builder</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">Config</span><span class="w"> </span><span class="n">conf</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">Config</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="k">if</span><span class="p">(</span><span class="n">runLocal</span><span class="p">)</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// use FlinkLocalCluster instead of LocalCluster</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">FlinkLocalCluster</span><span class="w"> </span><span class="n">cluster</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FlinkLocalCluster</span><span class="p">.</span><span class="na">getLocalCluster</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">cluster</span><span class="p">.</span><span class="na">submitTopology</span><span class="p">(</span><span class="s">&#34;WordCount&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">conf</span><span class="p">,</span><span class="w"> </span><span class="n">topology</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w"> </span><span class="k">else</span><span class="w"> </span><span class="p">{</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// use FlinkSubmitter instead of StormSubmitter</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">FlinkSubmitter</span><span class="p">.</span><span class="na">submitTopology</span><span class="p">(</span><span class="s">&#34;WordCount&#34;</span><span class="p">,</span><span class="w"> </span><span class="n">conf</span><span class="p">,</span><span class="w"> </span><span class="n">topology</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="p">}</span><span class="w">
</span></span></span></code></pre></div><p>As a shorter Flink-style alternative that replaces the Storm-style submission code, you can also use context-based job execution:</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// transform Storm topology to Flink program (as above)</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">FlinkTopology</span><span class="w"> </span><span class="n">topology</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">FlinkTopology</span><span class="p">.</span><span class="na">createTopology</span><span class="p">(</span><span class="n">builder</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// executes locally by default or remotely if submitted with Flink&#39;s command-line client</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">topology</span><span class="p">.</span><span class="na">execute</span><span class="p">()</span><span class="w">
</span></span></span></code></pre></div><p>After the code is packaged in a jar file (e.g., <code>StormWordCount.jar</code>), it can be easily submitted to Flink via</p>
<pre tabindex="0"><code>bin/flink run StormWordCount.jar
</code></pre><p>The used Spouts and Bolts as well as the topology assemble code is not changed at all!
Only the translation and submission step have to be changed to the Storm-API compatible Flink pendants.
This allows for minimal code changes and easy adaption to Flink.</p>
<h3 id="embedding-spouts-and-bolts-in-flink-programs">
Embedding Spouts and Bolts in Flink programs
<a class="anchor" href="#embedding-spouts-and-bolts-in-flink-programs">#</a>
</h3>
<p>It is also possible to use Spouts and Bolts within a regular Flink DataStream program.
The compatibility package provides wrapper classes for Spouts and Bolts which are implemented as a Flink <code>SourceFunction</code> and <code>StreamOperator</code> respectively.
Those wrappers automatically translate incoming Flink POJO and <code>TupleXX</code> records into Storm&rsquo;s <code>Tuple</code> type and emitted <code>Values</code> back into either POJOs or <code>TupleXX</code> types for further processing by Flink operators.
As Storm is type agnostic, it is required to specify the output type of embedded Spouts/Bolts manually to get a fully typed Flink streaming program.</p>
<div class="highlight"><pre tabindex="0" class="chroma"><code class="language-java" data-lang="java"><span class="line"><span class="cl"><span class="c1">// use regular Flink streaming environment</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">StreamExecutionEnvironment</span><span class="w"> </span><span class="n">env</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">StreamExecutionEnvironment</span><span class="p">.</span><span class="na">getExecutionEnvironment</span><span class="p">();</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// use Spout as source</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">source</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">env</span><span class="p">.</span><span class="na">addSource</span><span class="p">(</span><span class="c1">// Flink provided wrapper including original Spout</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">SpoutWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">FileSpout</span><span class="p">(</span><span class="n">localFilePath</span><span class="p">)),</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// specify output type manually</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TypeExtractor</span><span class="p">.</span><span class="na">getForObject</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;</span><span class="p">(</span><span class="s">&#34;&#34;</span><span class="p">)));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// FileSpout cannot be parallelized</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple1</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;&gt;</span><span class="w"> </span><span class="n">text</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">source</span><span class="p">.</span><span class="na">setParallelism</span><span class="p">(</span><span class="n">1</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// further processing with Flink</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">tokens</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="n">text</span><span class="p">.</span><span class="na">flatMap</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">Tokenizer</span><span class="p">()).</span><span class="na">keyBy</span><span class="p">(</span><span class="n">0</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// use Bolt for counting</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">DataStream</span><span class="o">&lt;</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="n">Integer</span><span class="o">&gt;</span><span class="w"> </span><span class="n">counts</span><span class="w"> </span><span class="o">=</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">tokens</span><span class="p">.</span><span class="na">transform</span><span class="p">(</span><span class="s">&#34;Counter&#34;</span><span class="p">,</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// specify output type manually</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="n">TypeExtractor</span><span class="p">.</span><span class="na">getForObject</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="n">Integer</span><span class="o">&gt;</span><span class="p">(</span><span class="s">&#34;&#34;</span><span class="p">,</span><span class="n">0</span><span class="p">))</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="c1">// Flink provided wrapper including original Bolt</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"> </span><span class="k">new</span><span class="w"> </span><span class="n">BoltWrapper</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="n">Tuple2</span><span class="o">&lt;</span><span class="n">String</span><span class="p">,</span><span class="n">Integer</span><span class="o">&gt;&gt;</span><span class="p">(</span><span class="k">new</span><span class="w"> </span><span class="n">BoltCounter</span><span class="p">()));</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// write result to file via Flink sink</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">counts</span><span class="p">.</span><span class="na">writeAsText</span><span class="p">(</span><span class="n">outputPath</span><span class="p">);</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="c1">// start Flink job</span><span class="w">
</span></span></span><span class="line"><span class="cl"><span class="w"></span><span class="n">env</span><span class="p">.</span><span class="na">execute</span><span class="p">(</span><span class="s">&#34;WordCount with Spout source and Bolt counter&#34;</span><span class="p">);</span><span class="w">
</span></span></span></code></pre></div><p>Although some boilerplate code is needed (we plan to address this soon!), the actual embedded Spout and Bolt code can be used unmodified.
We also note that the resulting program is fully typed, and type errors will be found by Flink&rsquo;s type extractor even if the original Spouts and Bolts are not.</p>
<h2 id="outlook">
Outlook
<a class="anchor" href="#outlook">#</a>
</h2>
<p>The Storm compatibility package is currently in beta and undergoes continuous development.
We are currently working on providing consistency guarantees for stateful Bolts.
Furthermore, we want to provide a better API integration for embedded Spouts and Bolts by providing a &ldquo;StormExecutionEnvironment&rdquo; as a special extension of Flink&rsquo;s <code>StreamExecutionEnvironment</code>.
We are also investigating the integration of Storm&rsquo;s higher-level programming API Trident.</p>
<h2 id="summary">
Summary
<a class="anchor" href="#summary">#</a>
</h2>
<p>Flink&rsquo;s compatibility package for Storm allows using unmodified Spouts and Bolts within Flink.
This enables you to even embed third-party Spouts and Bolts where the source code is not available.
While you can embed Spouts/Bolts in a Flink program and mix-and-match them with Flink operators, running whole topologies is the easiest way to get started and can be achieved with almost no code changes.</p>
<p>If you want to try out Flink&rsquo;s Storm compatibility package checkout our <a href="//nightlies.apache.org/flink/flink-docs-master/apis/streaming/storm_compatibility.html">Documentation</a>.</p>
<hr />
<p><sup id="fn1">1. We confess, there are three lines changed compared to a Storm project <img class="emoji" style="width:16px;height:16px;align:absmiddle" src="/img/blog/smirk.png">&mdash;because the example covers local <em>and</em> remote execution. <a href="#ref1" title="Back to text."></a></sup></p>
</p>
</article>
<div class="edit-this-page">
<p>
<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a>
</p>
<p>
<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2015-12-11-storm-compatibility.md">
Edit This Page<i class="fa fa-edit fa-fw"></i>
</a>
</p>
</div>
</section>
<aside class="book-toc">
<nav id="TableOfContents"><h3>On This Page <a href="javascript:void(0)" class="toc" onclick="collapseToc()"><i class="fa fa-times" aria-hidden="true"></i></a></h3>
<ul>
<li>
<ul>
<li><a href="#executing-storm-topologies-with-flink">Executing Storm topologies with Flink</a>
<ul>
<li><a href="#embedding-spouts-and-bolts-in-flink-programs">Embedding Spouts and Bolts in Flink programs</a></li>
</ul>
</li>
<li><a href="#outlook">Outlook</a></li>
<li><a href="#summary">Summary</a></li>
</ul>
</li>
</ul>
</nav>
</aside>
<aside class="expand-toc hidden">
<a class="toc" onclick="expandToc()" href="javascript:void(0)">
<i class="fa fa-bars" aria-hidden="true"></i>
</a>
</aside>
</main>
<footer>
<div class="separator"></div>
<div class="panels">
<div class="wrapper">
<div class="panel">
<ul>
<li>
<a href="https://flink-packages.org/">flink-packages.org</a>
</li>
<li>
<a href="https://www.apache.org/">Apache Software Foundation</a>
</li>
<li>
<a href="https://www.apache.org/licenses/">License</a>
</li>
<li>
<a href="/zh/">
<i class="fa fa-globe" aria-hidden="true"></i>&nbsp;中文版
</a>
</li>
</ul>
</div>
<div class="panel">
<ul>
<li>
<a href="/what-is-flink/security">Security</a-->
</li>
<li>
<a href="https://www.apache.org/foundation/sponsorship.html">Donate</a>
</li>
<li>
<a href="https://www.apache.org/foundation/thanks.html">Thanks</a>
</li>
</ul>
</div>
<div class="panel icons">
<div>
<a href="/posts">
<div class="icon flink-blog-icon"></div>
<span>Flink blog</span>
</a>
</div>
<div>
<a href="https://github.com/apache/flink">
<div class="icon flink-github-icon"></div>
<span>Github</span>
</a>
</div>
<div>
<a href="https://twitter.com/apacheflink">
<div class="icon flink-twitter-icon"></div>
<span>Twitter</span>
</a>
</div>
</div>
</div>
</div>
<hr/>
<div class="container disclaimer">
<p>The contents of this website are © 2024 Apache Software Foundation under the terms of the Apache License v2. Apache Flink, Flink, and the Flink logo are either registered trademarks or trademarks of The Apache Software Foundation in the United States and other countries.</p>
</div>
</footer>
</body>
</html>