blob: e0e09cab1c125b964302b198209557a610003d27 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm Kafka Integration (0.10.x+)</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 2.3.0</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm Kafka Integration (0.10.x+)</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><h1 id="storm-apache-kafka-integration-using-the-kafka-client-jar">Storm Apache Kafka integration using the kafka-client jar</h1>
<p>This includes the new Apache Kafka consumer API.</p>
<h2 id="compatibility">Compatibility</h2>
<p>Apache Kafka versions 0.10.1.0 onwards. Please be aware that <a href="https://issues.apache.org/jira/browse/KAFKA-7044">KAFKA-7044</a> can cause crashes in the spout, so you should upgrade Kafka if you are using an affected version (1.1.0, 1.1.1 or 2.0.0).</p>
<h2 id="writing-to-kafka-as-part-of-your-topology">Writing to Kafka as part of your topology</h2>
<p>You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
org.apache.storm.kafka.trident.TridentKafkaUpdater.</p>
<p>You need to provide implementations for the following 2 interfaces</p>
<h3 id="tupletokafkamapper-and-tridenttupletokafkamapper">TupleToKafkaMapper and TridentTupleToKafkaMapper</h3>
<p>These interfaces have 2 methods defined:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">K</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="n">V</span> <span class="nf">getMessageFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
</code></pre></div>
<p>As the name suggests, these methods are called to map a tuple to a Kafka key and a Kafka message. If you just want one field
as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
implementation. In the KafkaBolt, the implementation always looks for a field with field name &quot;key&quot; and &quot;message&quot; if you
use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
These should be specified while constructing an instance of FieldNameBasedTupleToKafkaMapper.</p>
<h3 id="kafkatopicselector-and-trident-kafkatopicselector">KafkaTopicSelector and trident KafkaTopicSelector</h3>
<p>This interface has only one method</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">KafkaTopicSelector</span> <span class="o">{</span>
<span class="n">String</span> <span class="nf">getTopics</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>The implementation of this interface should return the topic to which the tuple&#39;s key/message mapping needs to be published
You can return a null and the message will be ignored. If you have one static topic name then you can use
DefaultTopicSelector.java and set the name of the topic in the constructor.
<code>FieldNameTopicSelector</code> and <code>FieldIndexTopicSelector</code> can be used to select the topic should to publish a tuple to.
A user just needs to specify the field name or field index for the topic name in the tuple itself.
When the topic is name not found , the <code>Field*TopicSelector</code> will write messages into default topic .
Please make sure the default topic has been created .</p>
<h3 id="specifying-kafka-producer-properties">Specifying Kafka producer properties</h3>
<p>You can provide all the producer properties in your Storm topology by calling <code>KafkaBolt.withProducerProperties()</code> and <code>TridentKafkaStateFactory.withProducerProperties()</code>. Please see <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">http://kafka.apache.org/documentation.html#newproducerconfigs</a>
Section &quot;Important configuration properties for the producer&quot; for more details.
These are also defined in <code>org.apache.kafka.clients.producer.ProducerConfig</code></p>
<h3 id="using-wildcard-kafka-topic-match">Using wildcard kafka topic match</h3>
<p>You can do a wildcard topic match by adding the following config</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"kafka.topic.wildcard.match"</span><span class="o">,</span><span class="kc">true</span><span class="o">);</span>
</code></pre></div>
<p>After this you can specify a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc</p>
<h3 id="putting-it-all-together">Putting it all together</h3>
<p>For the bolt :</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"message"</span><span class="o">);</span>
<span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span>
<span class="c1">//set producer properties.</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">KafkaBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaBolt</span><span class="o">()</span>
<span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span>
<span class="o">.</span><span class="na">withTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span>
<span class="o">.</span><span class="na">withTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">());</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"forwardToKafka"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">,</span> <span class="mi">8</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaboltTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span>
</code></pre></div>
<p>For Trident:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">);</span>
<span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">Stream</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span>
<span class="c1">//set producer properties.</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">TridentKafkaStateFactory</span> <span class="n">stateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentKafkaStateFactory</span><span class="o">()</span>
<span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span>
<span class="o">.</span><span class="na">withKafkaTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span>
<span class="o">.</span><span class="na">withTridentTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span>
<span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">stateFactory</span><span class="o">,</span> <span class="n">fields</span><span class="o">,</span> <span class="k">new</span> <span class="n">TridentKafkaStateUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">());</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaTridentTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">.</span><span class="na">build</span><span class="o">());</span>
</code></pre></div>
<h2 id="reading-from-kafka-spouts">Reading From kafka (Spouts)</h2>
<h3 id="configuration">Configuration</h3>
<p>The spout implementations are configured by use of the <code>KafkaSpoutConfig</code> class. This class uses a Builder pattern and can be started either by calling one of
the Builders constructors or by calling the static method builder in the KafkaSpoutConfig class.</p>
<p>The Constructor or static method to create the builder require a few key values (that can be changed later on) but are the minimum config needed to start
a spout.</p>
<p><code>bootstrapServers</code> is the same as the Kafka Consumer Property &quot;bootstrap.servers&quot;.
<code>topics</code> The topics the spout will consume can either be a <code>Collection</code> of specific topic names (1 or more) or a regular expression <code>Pattern</code>, which specifies
that any topics that match that regular expression will be consumed.</p>
<p>If you are using the Builder Constructors instead of one of the <code>builder</code> methods, you will also need to specify a key deserializer and a value deserializer. This is to help guarantee type safety through the use
of Java generics. The deserializers can be specified via the consumer properties set with <code>setProp</code>. See the KafkaConsumer configuration documentation for details.</p>
<p>There are a few key configs to pay attention to.</p>
<p><code>setFirstPollOffsetStrategy</code> allows you to set where to start consuming data from. This is used both in case of failure recovery and starting the spout
for the first time. The allowed values are listed in the <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html">FirstPollOffsetStrategy javadocs</a>.</p>
<p><code>setProcessingGuarantee</code> lets you configure what processing guarantees the spout will provide. This affects how soon consumed offsets can be committed, and the frequency of commits. See the <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html">ProcessingGuarantee javadoc</a> for details.</p>
<p><code>setRecordTranslator</code> allows you to modify how the spout converts a Kafka Consumer Record into a Tuple, and which stream that tuple will be published into.
By default the &quot;topic&quot;, &quot;partition&quot;, &quot;offset&quot;, &quot;key&quot;, and &quot;value&quot; will be emitted to the &quot;default&quot; stream. If you want to output entries to different
streams based on the topic, storm provides <code>ByTopicRecordTranslator</code>. See below for more examples on how to use these.</p>
<p><code>setProp</code> and <code>setProps</code> can be used to set KafkaConsumer properties. The list of these properties can be found in the KafkaConsumer configuration documentation on the <a href="http://kafka.apache.org/documentation.html#consumerconfigs">Kafka website</a>. Note that KafkaConsumer autocommit is unsupported. The KafkaSpoutConfig constructor will throw an exception if the &quot;enable.auto.commit&quot; property is set, and the consumer used by the spout will always have that property set to false. You can configure similar behavior to autocommit through the <code>setProcessingGuarantee</code> method on the KafkaSpoutConfig builder.</p>
<h3 id="usage-examples">Usage Examples</h3>
<h4 id="create-a-simple-insecure-spout">Create a Simple Insecure Spout</h4>
<p>The following will consume all events published to &quot;topic&quot; and send them to MyBolt with the fields &quot;topic&quot;, &quot;partition&quot;, &quot;offset&quot;, &quot;key&quot;, &quot;value&quot;.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="kd">final</span> <span class="n">TopologyBuilder</span> <span class="n">tp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">KafkaSpout</span><span class="o">&lt;&gt;(</span><span class="n">KafkaSpoutConfig</span><span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="s">"127.0.0.1:"</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="s">"topic"</span><span class="o">).</span><span class="na">build</span><span class="o">()),</span> <span class="mi">1</span><span class="o">);</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"bolt"</span><span class="o">,</span> <span class="k">new</span> <span class="n">myBolt</span><span class="o">()).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">);</span>
<span class="o">...</span>
</code></pre></div>
<h4 id="wildcard-topics">Wildcard Topics</h4>
<p>Wildcard topics will consume from all topics that exist in the specified brokers list and match the pattern. So in the following example
&quot;topic&quot;, &quot;topic_foo&quot; and &quot;topic_bar&quot; will all match the pattern &quot;topic.*&quot;, but &quot;not_my_topic&quot; would not match. </p>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="kd">final</span> <span class="n">TopologyBuilder</span> <span class="n">tp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">KafkaSpout</span><span class="o">&lt;&gt;(</span><span class="n">KafkaSpoutConfig</span><span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="s">"127.0.0.1:"</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">.</span><span class="na">compile</span><span class="o">(</span><span class="s">"topic.*"</span><span class="o">)).</span><span class="na">build</span><span class="o">()),</span> <span class="mi">1</span><span class="o">);</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"bolt"</span><span class="o">,</span> <span class="k">new</span> <span class="n">myBolt</span><span class="o">()).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">);</span>
<span class="o">...</span>
</code></pre></div>
<h4 id="multiple-streams">Multiple Streams</h4>
<div class="highlight"><pre><code class="language-java" data-lang="java">
<span class="kd">final</span> <span class="n">TopologyBuilder</span> <span class="n">tp</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="c1">//By default all topics not covered by another rule, but consumed by the spout will be emitted to "STREAM_1" as "topic", "key", and "value"</span>
<span class="n">ByTopicRecordTranslator</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">byTopic</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ByTopicRecordTranslator</span><span class="o">&lt;&gt;(</span>
<span class="o">(</span><span class="n">r</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">r</span><span class="o">.</span><span class="na">topic</span><span class="o">(),</span> <span class="n">r</span><span class="o">.</span><span class="na">key</span><span class="o">(),</span> <span class="n">r</span><span class="o">.</span><span class="na">value</span><span class="o">()),</span>
<span class="k">new</span> <span class="nf">Fields</span><span class="o">(</span><span class="s">"topic"</span><span class="o">,</span> <span class="s">"key"</span><span class="o">,</span> <span class="s">"value"</span><span class="o">),</span> <span class="s">"STREAM_1"</span><span class="o">);</span>
<span class="c1">//For topic_2 all events will be emitted to "STREAM_2" as just "key" and "value"</span>
<span class="n">byTopic</span><span class="o">.</span><span class="na">forTopic</span><span class="o">(</span><span class="s">"topic_2"</span><span class="o">,</span> <span class="o">(</span><span class="n">r</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="k">new</span> <span class="n">Values</span><span class="o">(</span><span class="n">r</span><span class="o">.</span><span class="na">key</span><span class="o">(),</span> <span class="n">r</span><span class="o">.</span><span class="na">value</span><span class="o">()),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"value"</span><span class="o">),</span> <span class="s">"STREAM_2"</span><span class="o">);</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">,</span> <span class="k">new</span> <span class="n">KafkaSpout</span><span class="o">&lt;&gt;(</span><span class="n">KafkaSpoutConfig</span><span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="s">"127.0.0.1:"</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="s">"topic_1"</span><span class="o">,</span> <span class="s">"topic_2"</span><span class="o">,</span> <span class="s">"topic_3"</span><span class="o">).</span><span class="na">build</span><span class="o">()),</span> <span class="mi">1</span><span class="o">);</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"bolt"</span><span class="o">,</span> <span class="k">new</span> <span class="n">myBolt</span><span class="o">()).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">,</span> <span class="s">"STREAM_1"</span><span class="o">);</span>
<span class="n">tp</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"another"</span><span class="o">,</span> <span class="k">new</span> <span class="n">myOtherBolt</span><span class="o">()).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"kafka_spout"</span><span class="o">,</span> <span class="s">"STREAM_2"</span><span class="o">);</span>
<span class="o">...</span>
</code></pre></div>
<h4 id="trident">Trident</h4>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">final</span> <span class="n">TridentTopology</span> <span class="n">tridentTopology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="kd">final</span> <span class="n">Stream</span> <span class="n">spoutStream</span> <span class="o">=</span> <span class="n">tridentTopology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"kafkaSpout"</span><span class="o">,</span>
<span class="k">new</span> <span class="n">KafkaTridentSpoutOpaque</span><span class="o">&lt;&gt;(</span><span class="n">KafkaSpoutConfig</span><span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="s">"127.0.0.1:"</span> <span class="o">+</span> <span class="n">port</span><span class="o">,</span> <span class="n">Pattern</span><span class="o">.</span><span class="na">compile</span><span class="o">(</span><span class="s">"topic.*"</span><span class="o">)).</span><span class="na">build</span><span class="o">()))</span>
<span class="o">.</span><span class="na">parallelismHint</span><span class="o">(</span><span class="mi">1</span><span class="o">)</span>
<span class="o">...</span>
</code></pre></div>
<p>Trident does not support multiple streams and will ignore any streams set for output. If however the Fields are not identical for each
output topic it will throw an exception and not continue.</p>
<h4 id="example-topologies">Example topologies</h4>
<p>Example topologies using storm-kafka-client can be found in the examples/storm-kafka-client-examples directory included in the Storm source or binary distributions.</p>
<h3 id="custom-recordtranslators-advanced">Custom RecordTranslators (ADVANCED)</h3>
<p>In most cases the built in SimpleRecordTranslator and ByTopicRecordTranslator should cover your use case. If you do run into a situation where you need a custom one
then this documentation will describe how to do this properly, and some of the less than obvious classes involved.</p>
<p>The point of <code>apply</code> is to take a ConsumerRecord and turn it into a <code>List&lt;Object&gt;</code> that can be emitted. What is not obvious is how to tell the spout to emit it to a
specific stream. To do this you will need to return an instance of <code>org.apache.storm.kafka.spout.KafkaTuple</code>. This provides a method <code>routedTo</code> that will say which
specific stream the tuple should go to.</p>
<p>For Example:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="k">return</span> <span class="k">new</span> <span class="n">KafkaTuple</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="mi">2</span><span class="o">,</span> <span class="mi">3</span><span class="o">,</span> <span class="mi">4</span><span class="o">).</span><span class="na">routedTo</span><span class="o">(</span><span class="s">"bar"</span><span class="o">);</span>
</code></pre></div>
<p>Will cause the tuple to be emitted on the &quot;bar&quot; stream.</p>
<p>Be careful when writing custom record translators because just like in a storm spout it needs to be self consistent. The <code>streams</code> method should return
a full set of streams that this translator will ever try to emit on. Additionally <code>getFieldsFor</code> should return a valid Fields object for each of those
streams. If you are doing this for Trident a value must be in the List returned by <code>apply</code> for every field in the Fields object for that stream,
otherwise trident can throw exceptions.</p>
<h3 id="manual-partition-assignment-advanced">Manual Partition Assignment (ADVANCED)</h3>
<p>By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the <code>ManualPartitioner</code> interface. You can pass your implementation to the <code>KafkaSpoutConfig.Builder</code> constructor. Please take care when supplying a custom implementation, since an incorrect <code>ManualPartitioner</code> implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the <code>RoundRobinManualPartitioner</code> for an example of how to implement this functionality.</p>
<h3 id="manual-partition-discovery">Manual partition discovery</h3>
<p>You can customize how the spout discovers existing partitions, by implementing the <code>TopicFilter</code> interface. Storm-kafka-client ships with a few implementations. Like <code>ManualPartitioner</code>, you can pass your implementation to the <code>KafkaSpoutConfig.Builder</code> constructor. Note that the <code>TopicFilter</code> is only responsible for discovering partitions, deciding which of the discovered partitions to subscribe to is the responsibility of <code>ManualPartitioner</code>.</p>
<h2 id="using-storm-kafka-client-with-different-versions-of-kafka">Using storm-kafka-client with different versions of kafka</h2>
<p>Storm-kafka-client&#39;s Kafka dependency is defined as <code>provided</code> scope in maven, meaning it will not be pulled in
as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.</p>
<p>When building a project with storm-kafka-client, you must explicitly add the Kafka clients dependency. For example, to
use Kafka-clients 0.10.0.0, you would use the following dependency in your <code>pom.xml</code>:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"> <span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>kafka-clients<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.10.0.0<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
</code></pre></div>
<p>You can also override the kafka clients version while building from maven, with parameter <code>storm.kafka.client.version</code>
e.g. <code>mvn clean install -Dstorm.kafka.client.version=0.10.0.0</code></p>
<p>When selecting a kafka client version, you should ensure -
1. The Kafka api must be compatible. The storm-kafka-client module only supports Kafka <strong>0.10 or newer</strong>. For older versions,
you can use the storm-kafka module (<a href="https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka">https://github.com/apache/storm/tree/1.x-branch/external/storm-kafka</a>).<br>
2. The Kafka client version selected by you should be wire compatible with the broker. Please see the <a href="https://cwiki.apache.org/confluence/display/KAFKA/Compatibility+Matrix">Kafka compatibility matrix</a>.</p>
<h1 id="kafka-spout-performance-tuning">Kafka Spout Performance Tuning</h1>
<p>The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setOffsetCommitPeriodMs-long-">setOffsetCommitPeriodMs</a> and <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setMaxUncommittedOffsets-int-">setMaxUncommittedOffsets</a> methods. </p>
<ul>
<li>&quot;offset.commit.period.ms&quot; controls how often the spout commits to Kafka</li>
<li>&quot;max.uncommitted.offsets&quot; controls how many offsets can be pending commit before another poll can take place
<br/></li>
</ul>
<p>The <a href="http://kafka.apache.org/documentation.html#consumerconfigs">Kafka consumer config</a> parameters may also have an impact on the performance of the spout. The following Kafka parameters are likely the most influential in the spout performance: </p>
<ul>
<li>“fetch.min.bytes”</li>
<li>“fetch.max.wait.ms”</li>
<li><a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">Kafka Consumer</a> instance poll timeout, which is specified for each Kafka spout using the <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setPollTimeoutMs-long-">setPollTimeoutMs</a> method.
<br/></li>
</ul>
<p>Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.</p>
<h3 id="default-values">Default values</h3>
<p>Currently the Kafka spout has has the following default values, which have been shown to give good performance in the test environment as described in this <a href="https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/">blog post</a></p>
<ul>
<li>poll.timeout.ms = 200</li>
<li>offset.commit.period.ms = 30000 (30s)</li>
<li>max.uncommitted.offsets = 10000000
<br/></li>
</ul>
<h1 id="tuple-tracking">Tuple Tracking</h1>
<p>By default the spout only tracks emitted tuples when the processing guarantee is AT_LEAST_ONCE. It may be necessary to track
emitted tuples with other processing guarantees to benefit from Storm features such as showing complete latency in the UI,
or enabling backpressure with Config.TOPOLOGY_MAX_SPOUT_PENDING.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">KafkaSpoutConfig</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">kafkaConf</span> <span class="o">=</span> <span class="n">KafkaSpoutConfig</span>
<span class="o">.</span><span class="na">builder</span><span class="o">(</span><span class="n">String</span> <span class="n">bootstrapServers</span><span class="o">,</span> <span class="n">String</span> <span class="o">...</span> <span class="n">topics</span><span class="o">)</span>
<span class="o">.</span><span class="na">setProcessingGuarantee</span><span class="o">(</span><span class="n">ProcessingGuarantee</span><span class="o">.</span><span class="na">AT_MOST_ONCE</span><span class="o">)</span>
<span class="o">.</span><span class="na">setTupleTrackingEnforced</span><span class="o">(</span><span class="kc">true</span><span class="o">)</span>
</code></pre></div>
<p>Note: This setting has no effect with AT_LEAST_ONCE processing guarantee, where tuple tracking is required and therefore always enabled.</p>
<h1 id="mapping-from-storm-kafka-to-storm-kafka-client-spout-properties">Mapping from <code>storm-kafka</code> to <code>storm-kafka-client</code> spout properties</h1>
<p>This may not be an exhaustive list because the <code>storm-kafka</code> configs were taken from Storm 0.9.6
<a href="https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/SpoutConfig.html">SpoutConfig</a> and
<a href="https://svn.apache.org/repos/asf/storm/site/releases/0.9.6/javadocs/storm/kafka/KafkaConfig.html">KafkaConfig</a>.
<code>storm-kafka-client</code> spout configurations were taken from Storm 1.0.6
<a href="https://storm.apache.org/releases/1.0.6/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.html">KafkaSpoutConfig</a>
and Kafka 0.10.1.0 <a href="https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/ConsumerConfig.html">ConsumerConfig</a>.</p>
<table><thead>
<tr>
<th>SpoutConfig</th>
<th>KafkaSpoutConfig/ConsumerConfig</th>
<th>KafkaSpoutConfig Usage</th>
</tr>
</thead><tbody>
<tr>
<td><strong>Setting:</strong> <code>startOffsetTime</code><br><br> <strong>Default:</strong> <code>EarliestTime</code><br>________________________________________________ <br> <strong>Setting:</strong> <code>forceFromStart</code> <br><br> <strong>Default:</strong> <code>false</code> <br><br> <code>startOffsetTime</code> &amp; <code>forceFromStart</code> together determine the starting offset. <code>forceFromStart</code> determines whether the Zookeeper offset is ignored. <code>startOffsetTime</code> sets the timestamp that determines the beginning offset, in case there is no offset in Zookeeper, or the Zookeeper offset is ignored</td>
<td><strong>Setting:</strong> <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.FirstPollOffsetStrategy.html"><code>FirstPollOffsetStrategy</code></a><br><br> <strong>Default:</strong> <code>UNCOMMITTED_EARLIEST</code> <br><br> <a href="#helper-table-for-setting-firstpolloffsetstrategy">Refer to the helper table</a> for picking <code>FirstPollOffsetStrategy</code> based on your <code>startOffsetTime</code> &amp; <code>forceFromStart</code> settings</td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setFirstPollOffsetStrategy-org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setFirstPollOffsetStrategy(&lt;strategy-name&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>scheme</code><br><br> The interface that specifies how a <code>ByteBuffer</code> from a Kafka topic is transformed into Storm tuple <br><strong>Default:</strong> <code>RawMultiScheme</code></td>
<td><strong>Setting:</strong> <a href="https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Deserializer.html"><code>Deserializers</code></a></td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, &lt;deserializer-class&gt;)</code></a><br><br> <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, &lt;deserializer-class&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>fetchSizeBytes</code><br><br> Message fetch size -- the number of bytes to attempt to fetch in one request to a Kafka server <br> <strong>Default:</strong> <code>1MB</code></td>
<td><strong>Setting:</strong> <a href="http://kafka.apache.org/10/documentation.html#newconsumerconfigs"><code>max.partition.fetch.bytes</code></a></td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, &lt;int-value&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>bufferSizeBytes</code><br><br> Buffer size (in bytes) for network requests. The buffer size which consumer has for pulling data from producer <br> <strong>Default:</strong> <code>1MB</code></td>
<td><strong>Setting:</strong> <a href="http://kafka.apache.org/10/documentation.html#newconsumerconfigs"><code>receive.buffer.bytes</code></a></td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.RECEIVE_BUFFER_CONFIG, &lt;int-value&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>socketTimeoutMs</code><br><br> <strong>Default:</strong> <code>10000</code></td>
<td><strong>N/A</strong></td>
<td></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>useStartOffsetTimeIfOffsetOutOfRange</code><br><br> <strong>Default:</strong> <code>true</code></td>
<td><strong>Setting:</strong> <a href="http://kafka.apache.org/10/documentation.html#newconsumerconfigs"><code>auto.offset.reset</code></a> <br><br> <strong>Default:</strong> Note that the default value for <code>auto.offset.reset</code> is <code>earliest</code> if you have <a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.ProcessingGuarantee.html"><code>ProcessingGuarantee</code></a> set to <code>AT_LEAST_ONCE</code>, but the default value is <code>latest</code> otherwise.</td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, &lt;String&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>fetchMaxWait</code><br><br> Maximum time in ms to wait for the response <br> <strong>Default:</strong> <code>10000</code></td>
<td><strong>Setting:</strong> <a href="http://kafka.apache.org/10/documentation.html#newconsumerconfigs"><code>fetch.max.wait.ms</code></a></td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, &lt;value&gt;)</code></a></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>maxOffsetBehind</code><br><br> Specifies how long a spout attempts to retry the processing of a failed tuple. One of the scenarios is when a failing tuple&#39;s offset is more than <code>maxOffsetBehind</code> behind the acked offset, the spout stops retrying the tuple.<br><strong>Default:</strong> <code>LONG.MAX_VALUE</code></td>
<td><strong>N/A</strong></td>
<td></td>
</tr>
<tr>
<td><strong>Setting:</strong> <code>clientId</code></td>
<td><strong>Setting:</strong> <a href="http://kafka.apache.org/10/documentation.html#newconsumerconfigs"><code>client.id</code></a></td>
<td><a href="javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html#setProp-java.lang.String-java.lang.Object-"><code>&lt;KafkaSpoutConfig-Builder&gt;.setProp(ConsumerConfig.CLIENT_ID_CONFIG, &lt;String&gt;)</code></a></td>
</tr>
</tbody></table>
<p>If you are using this table to upgrade your topology to use <code>storm-kafka-client</code> instead of <code>storm-kafka</code>, then you will also need to migrate the consumer offsets from ZooKeeper to Kafka broker. Use <a href="https://github.com/apache/storm/tree/master/external/storm-kafka-migration"><code>storm-kafka-migration</code></a> tool to migrate the Kafka consumer offsets.</p>
<h4 id="helper-table-for-setting-firstpolloffsetstrategy">Helper table for setting <code>FirstPollOffsetStrategy</code></h4>
<p>Pick and set <code>FirstPollOffsetStrategy</code> based on <code>startOffsetTime</code> &amp; <code>forceFromStart</code> settings:</p>
<table><thead>
<tr>
<th><code>startOffsetTime</code></th>
<th><code>forceFromStart</code></th>
<th><code>FirstPollOffsetStrategy</code></th>
</tr>
</thead><tbody>
<tr>
<td><code>EarliestTime</code></td>
<td><code>true</code></td>
<td><code>EARLIEST</code></td>
</tr>
<tr>
<td><code>EarliestTime</code></td>
<td><code>false</code></td>
<td><code>UNCOMMITTED_EARLIEST</code></td>
</tr>
<tr>
<td><code>LatestTime</code></td>
<td><code>true</code></td>
<td><code>LATEST</code></td>
</tr>
<tr>
<td><code>LatestTime</code></td>
<td><code>false</code></td>
<td><code>UNCOMMITTED_LATEST</code></td>
</tr>
</tbody></table>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>