blob: 15a767fe71d85a49aa4624824750c5a43da8d533 [file] [log] [blame]
<!DOCTYPE html>
<html>
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="shortcut icon" href="/favicon.ico" type="image/x-icon">
<link rel="icon" href="/favicon.ico" type="image/x-icon">
<title>Storm Kafka Integration (0.8.x)</title>
<!-- Bootstrap core CSS -->
<link href="/assets/css/bootstrap.min.css" rel="stylesheet">
<!-- Bootstrap theme -->
<link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet">
<!-- Custom styles for this template -->
<link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css">
<link href="/css/style.css" rel="stylesheet">
<link href="/assets/css/owl.theme.css" rel="stylesheet">
<link href="/assets/css/owl.carousel.css" rel="stylesheet">
<script type="text/javascript" src="/assets/js/jquery.min.js"></script>
<script type="text/javascript" src="/assets/js/bootstrap.min.js"></script>
<script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script>
<script type="text/javascript" src="/assets/js/storm.js"></script>
<!-- Just for debugging purposes. Don't actually copy these 2 lines! -->
<!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]-->
<!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries -->
<!--[if lt IE 9]>
<script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script>
<script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script>
<![endif]-->
</head>
<body>
<header>
<div class="container-fluid">
<div class="row">
<div class="col-md-5">
<a href="/index.html"><img src="/images/logo.png" class="logo" /></a>
</div>
<div class="col-md-5">
<h1>Version: 1.2.3</h1>
</div>
<div class="col-md-2">
<a href="/downloads.html" class="btn-std btn-block btn-download">Download</a>
</div>
</div>
</div>
</header>
<!--Header End-->
<!--Navigation Begin-->
<div class="navbar" role="banner">
<div class="container-fluid">
<div class="navbar-header">
<button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse">
<span class="icon-bar"></span>
<span class="icon-bar"></span>
<span class="icon-bar"></span>
</button>
</div>
<nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation">
<ul class="nav navbar-nav">
<li><a href="/index.html" id="home">Home</a></li>
<li><a href="/getting-help.html" id="getting-help">Getting Help</a></li>
<li><a href="/about/integrates.html" id="project-info">Project Information</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/releases/2.3.0/index.html">2.3.0</a></li>
<li><a href="/releases/2.2.0/index.html">2.2.0</a></li>
<li><a href="/releases/2.1.0/index.html">2.1.0</a></li>
<li><a href="/releases/2.0.0/index.html">2.0.0</a></li>
<li><a href="/releases/1.2.3/index.html">1.2.3</a></li>
</ul>
</li>
<li><a href="/talksAndVideos.html">Talks and Slideshows</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li>
<li><a href="/contribute/People.html">People</a></li>
<li><a href="/contribute/BYLAWS.html">ByLaws</a></li>
</ul>
</li>
<li><a href="/2021/09/27/storm230-released.html" id="news">News</a></li>
</ul>
</nav>
</div>
</div>
<div class="container-fluid">
<h1 class="page-title">Storm Kafka Integration (0.8.x)</h1>
<div class="row">
<div class="col-md-12">
<!-- Documentation -->
<p class="post-meta"></p>
<div class="documentation-content"><h2 id="deprecated">Deprecated</h2>
<p>Storm-kafka has been deprecated and will be removed in a future Storm release. Please upgrade to storm-kafka-client.
If you need to migrate the committed offsets to the new spout, consider using the storm-kafka-migration tool.</p>
<p>Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.</p>
<h2 id="spouts">Spouts</h2>
<p>We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that
tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.</p>
<h3 id="brokerhosts">BrokerHosts</h3>
<p>In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts.
Currently, we support the following two implementations:</p>
<h4 id="zkhosts">ZkHosts</h4>
<p>ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses
Kafka&#39;s ZooKeeper entries to track brokerHost -&gt; partition mapping. You can instantiate an object by calling</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">String</span> <span class="n">brokerZkStr</span><span class="o">,</span> <span class="n">String</span> <span class="n">brokerZkPath</span><span class="o">)</span>
<span class="kd">public</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">String</span> <span class="n">brokerZkStr</span><span class="o">)</span>
</code></pre></div>
<p>Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and
partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.</p>
<p>By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you
should set host.refreshFreqSecs to your chosen value.</p>
<h4 id="statichosts">StaticHosts</h4>
<p>This is an alternative implementation where broker -&gt; partition information is static. In order to construct an instance
of this class, you need to first construct an instance of GlobalPartitionInformation.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Broker</span> <span class="n">brokerForPartition0</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span><span class="c1">//localhost:9092</span>
<span class="n">Broker</span> <span class="n">brokerForPartition1</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9092</span><span class="o">);</span><span class="c1">//localhost:9092 but we specified the port explicitly</span>
<span class="n">Broker</span> <span class="n">brokerForPartition2</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost:9092"</span><span class="o">);</span><span class="c1">//localhost:9092 specified as one string.</span>
<span class="n">GlobalPartitionInformation</span> <span class="n">partitionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">GlobalPartitionInformation</span><span class="o">();</span>
<span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">brokerForPartition0</span><span class="o">);</span><span class="c1">//mapping from partition 0 to brokerForPartition0</span>
<span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">brokerForPartition1</span><span class="o">);</span><span class="c1">//mapping from partition 1 to brokerForPartition1</span>
<span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="n">brokerForPartition2</span><span class="o">);</span><span class="c1">//mapping from partition 2 to brokerForPartition2</span>
<span class="n">StaticHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StaticHosts</span><span class="o">(</span><span class="n">partitionInfo</span><span class="o">);</span>
</code></pre></div>
<h3 id="kafkaconfig">KafkaConfig</h3>
<p>The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">KafkaConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">)</span>
<span class="kd">public</span> <span class="nf">KafkaConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">clientId</span><span class="o">)</span>
</code></pre></div>
<p>The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic.
The optional ClientId is used as a part of the ZooKeeper path where the spout&#39;s current consumption offset is stored.</p>
<p>There are 2 extensions of KafkaConfig currently in use.</p>
<p>SpoutConfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling
behavior specific to KafkaSpout.
The clientId will be used to identify requests which are made using the Kafka Protocol.
The zkRoot will be used as root to store your consumer&#39;s offset.
The id should uniquely identify your spout.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">clientId</span><span class="o">,</span> <span class="n">String</span> <span class="n">zkRoot</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span>
<span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">zkRoot</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span>
<span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span>
</code></pre></div>
<p>In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// setting for how often to save the current Kafka offset to ZooKeeper</span>
<span class="kd">public</span> <span class="kt">long</span> <span class="n">stateUpdateIntervalMs</span> <span class="o">=</span> <span class="mi">2000</span><span class="o">;</span>
<span class="c1">// Retry strategy for failed messages</span>
<span class="kd">public</span> <span class="n">String</span> <span class="n">failedMsgRetryManagerClass</span> <span class="o">=</span> <span class="n">ExponentialBackoffMsgRetryManager</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">();</span>
<span class="c1">// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt</span>
<span class="c1">// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.</span>
<span class="c1">// Initial delay between successive retries</span>
<span class="kd">public</span> <span class="kt">long</span> <span class="n">retryInitialDelayMs</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">double</span> <span class="n">retryDelayMultiplier</span> <span class="o">=</span> <span class="mf">1.0</span><span class="o">;</span>
<span class="c1">// Maximum delay between successive retries </span>
<span class="kd">public</span> <span class="kt">long</span> <span class="n">retryDelayMaxMs</span> <span class="o">=</span> <span class="mi">60</span> <span class="o">*</span> <span class="mi">1000</span><span class="o">;</span>
<span class="c1">// Failed message will be retried infinitely if retryLimit is less than zero. </span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">retryLimit</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span>
</code></pre></div>
<p>Core KafkaSpout only accepts an instance of SpoutConfig.</p>
<p>TridentKafkaConfig is another extension of KafkaConfig.
TridentKafkaEmitter only accepts TridentKafkaConfig.</p>
<p>The KafkaConfig class also has bunch of public variables that controls your application&#39;s behavior. Here are defaults:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kt">int</span> <span class="n">fetchSizeBytes</span> <span class="o">=</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">socketTimeoutMs</span> <span class="o">=</span> <span class="mi">10000</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">fetchMaxWait</span> <span class="o">=</span> <span class="mi">10000</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">bufferSizeBytes</span> <span class="o">=</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span><span class="o">;</span>
<span class="kd">public</span> <span class="n">MultiScheme</span> <span class="n">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">RawMultiScheme</span><span class="o">();</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="n">ignoreZkOffsets</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">long</span> <span class="n">startOffsetTime</span> <span class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span class="na">api</span><span class="o">.</span><span class="na">OffsetRequest</span><span class="o">.</span><span class="na">EarliestTime</span><span class="o">();</span>
<span class="kd">public</span> <span class="kt">long</span> <span class="n">maxOffsetBehind</span> <span class="o">=</span> <span class="n">Long</span><span class="o">.</span><span class="na">MAX_VALUE</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">boolean</span> <span class="n">useStartOffsetTimeIfOffsetOutOfRange</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span>
<span class="kd">public</span> <span class="kt">int</span> <span class="n">metricsTimeBucketSizeInSecs</span> <span class="o">=</span> <span class="mi">60</span><span class="o">;</span>
</code></pre></div>
<p>Most of them are self explanatory except MultiScheme.</p>
<h3 id="multischeme">MultiScheme</h3>
<p>MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It
also controls the naming of your output field.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="nf">deserialize</span><span class="o">(</span><span class="n">ByteBuffer</span> <span class="n">ser</span><span class="o">);</span>
<span class="kd">public</span> <span class="n">Fields</span> <span class="nf">getOutputFields</span><span class="o">();</span>
</code></pre></div>
<p>The default <code>RawMultiScheme</code> just takes the <code>ByteBuffer</code> and returns a tuple with the ByteBuffer converted to a <code>byte[]</code>. The name of the outputField is &quot;bytes&quot;. There are alternative implementations like <code>SchemeAsMultiScheme</code> and <code>KeyValueSchemeAsMultiScheme</code> which can convert the <code>ByteBuffer</code> to <code>String</code>.</p>
<p>There is also an extension of <code>SchemeAsMultiScheme</code>, <code>MessageMetadataSchemeAsMultiScheme</code>,
which has an additional deserialize method that accepts the message <code>ByteBuffer</code> in addition to the <code>Partition</code> and <code>offset</code> associated with the message.</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">Iterable</span><span class="o">&lt;</span><span class="n">List</span><span class="o">&lt;</span><span class="n">Object</span><span class="o">&gt;&gt;</span> <span class="nf">deserializeMessageWithMetadata</span><span class="o">(</span><span class="n">ByteBuffer</span> <span class="n">message</span><span class="o">,</span> <span class="n">Partition</span> <span class="n">partition</span><span class="o">,</span> <span class="kt">long</span> <span class="n">offset</span><span class="o">)</span>
</code></pre></div>
<p>This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.</p>
<h3 id="failed-message-retry">Failed message retry</h3>
<p>FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays
between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname
of implementation. Here is the interface </p>
<div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="c1">// Spout initialization can go here. This can be called multiple times during lifecycle of a worker. </span>
<span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">SpoutConfig</span> <span class="n">spoutConfig</span><span class="o">,</span> <span class="n">Map</span> <span class="n">stormConf</span><span class="o">);</span>
<span class="c1">// Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.</span>
<span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span>
<span class="c1">// Message corresponding to offset has been acked. </span>
<span class="kt">void</span> <span class="nf">acked</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span>
<span class="c1">// Message corresponding to the offset, has been re-emitted and under transit.</span>
<span class="kt">void</span> <span class="nf">retryStarted</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span>
<span class="cm">/**
* The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset
* and resend them, except completed messages.
*/</span>
<span class="n">Long</span> <span class="nf">nextFailedMessageToRetry</span><span class="o">();</span>
<span class="cm">/**
* @return True if the message corresponding to the offset should be emitted NOW. False otherwise.
*/</span>
<span class="kt">boolean</span> <span class="nf">shouldReEmitMsg</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span>
<span class="cm">/**
* Spout will clean up the state for this offset if false is returned. If retryFurther is set to true,
* spout will called failed(offset) in next call and acked(offset) otherwise
*/</span>
<span class="kt">boolean</span> <span class="nf">retryFurther</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span>
<span class="cm">/**
* Clear any offsets before kafkaOffset. These offsets are no longer available in kafka.
*/</span>
<span class="n">Set</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">&gt;</span> <span class="nf">clearOffsetsBefore</span><span class="o">(</span><span class="n">Long</span> <span class="n">kafkaOffset</span><span class="o">);</span>
</code></pre></div>
<h4 id="version-incompatibility">Version incompatibility</h4>
<p>In Storm versions prior to 1.0, the MultiScheme methods accepted a <code>byte[]</code> instead of <code>ByteBuffer</code>. The <code>MultScheme</code> and the related
Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].</p>
<p>This means that pre 1.0 kafka spouts will not work with Storm versions 1.0 and higher. While running topologies in Storm version 1.0
and higher, it must be ensured that the storm-kafka version is at least 1.0. Pre 1.0 shaded topology jars that bundles
storm-kafka classes must be rebuilt with storm-kafka version 1.0 for running in clusters with storm 1.0 and higher.</p>
<h3 id="examples">Examples</h3>
<h4 id="core-spout">Core Spout</h4>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">BrokerHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkHosts</span><span class="o">(</span><span class="n">zkConnString</span><span class="o">);</span>
<span class="n">SpoutConfig</span> <span class="n">spoutConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SpoutConfig</span><span class="o">(</span><span class="n">hosts</span><span class="o">,</span> <span class="n">topicName</span><span class="o">,</span> <span class="s">"/"</span> <span class="o">+</span> <span class="n">topicName</span><span class="o">,</span> <span class="n">UUID</span><span class="o">.</span><span class="na">randomUUID</span><span class="o">().</span><span class="na">toString</span><span class="o">());</span>
<span class="n">spoutConfig</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="n">StringScheme</span><span class="o">());</span>
<span class="n">KafkaSpout</span> <span class="n">kafkaSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSpout</span><span class="o">(</span><span class="n">spoutConfig</span><span class="o">);</span>
</code></pre></div>
<h4 id="trident-spout">Trident Spout</h4>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">BrokerHosts</span> <span class="n">zk</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkHosts</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span>
<span class="n">TridentKafkaConfig</span> <span class="n">spoutConf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentKafkaConfig</span><span class="o">(</span><span class="n">zk</span><span class="o">,</span> <span class="s">"test-topic"</span><span class="o">);</span>
<span class="n">spoutConf</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="n">StringScheme</span><span class="o">());</span>
<span class="n">OpaqueTridentKafkaSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">OpaqueTridentKafkaSpout</span><span class="o">(</span><span class="n">spoutConf</span><span class="o">);</span>
</code></pre></div>
<h3 id="how-kafkaspout-stores-offsets-of-a-kafka-topic-and-recovers-in-case-of-failures">How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures</h3>
<p>As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by
setting <code>KafkaConfig.startOffsetTime</code> as follows:</p>
<ol>
<li><code>kafka.api.OffsetRequest.EarliestTime()</code>: read from the beginning of the topic (i.e. from the oldest messages onwards)</li>
<li><code>kafka.api.OffsetRequest.LatestTime()</code>: read from the end of the topic (i.e. any new messsages that are being written to the topic)</li>
<li>A Unix timestamp aka seconds since the epoch (e.g. via <code>System.currentTimeMillis()</code>):
see <a href="https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?">How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?</a> in the Kafka FAQ</li>
</ol>
<p>As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information
under the ZooKeeper path <code>SpoutConfig.zkRoot+ &quot;/&quot; + SpoutConfig.id</code>. In the case of failures it recovers from the last
written offset in ZooKeeper.</p>
<blockquote>
<p><strong>Important:</strong> When re-deploying a topology make sure that the settings for <code>SpoutConfig.zkRoot</code> and <code>SpoutConfig.id</code>
were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the
offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.</p>
</blockquote>
<p>This means that when a topology has run once the setting <code>KafkaConfig.startOffsetTime</code> will not have an effect for
subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in
ZooKeeper to determine from where it should begin (more precisely: resume) reading.
If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should
set the parameter <code>KafkaConfig.ignoreZkOffsets</code> to <code>true</code>. If <code>true</code>, the spout will always begin reading from the
offset defined by <code>KafkaConfig.startOffsetTime</code> as described above.</p>
<h2 id="using-storm-kafka-with-different-versions-of-kafka">Using storm-kafka with different versions of Kafka</h2>
<p>Storm-kafka&#39;s Kafka dependency is defined as <code>provided</code> scope in maven, meaning it will not be pulled in
as a transitive dependency. This allows you to use a version of Kafka dependency-compatible with your Kafka cluster.</p>
<p>When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to
use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your <code>pom.xml</code>:</p>
<div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt">&lt;dependency&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.kafka<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>kafka_2.10<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;version&gt;</span>0.8.1.1<span class="nt">&lt;/version&gt;</span>
<span class="nt">&lt;exclusions&gt;</span>
<span class="nt">&lt;exclusion&gt;</span>
<span class="nt">&lt;groupId&gt;</span>org.apache.zookeeper<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>zookeeper<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;/exclusion&gt;</span>
<span class="nt">&lt;exclusion&gt;</span>
<span class="nt">&lt;groupId&gt;</span>log4j<span class="nt">&lt;/groupId&gt;</span>
<span class="nt">&lt;artifactId&gt;</span>log4j<span class="nt">&lt;/artifactId&gt;</span>
<span class="nt">&lt;/exclusion&gt;</span>
<span class="nt">&lt;/exclusions&gt;</span>
<span class="nt">&lt;/dependency&gt;</span>
</code></pre></div>
<p>Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm&#39;s dependencies.</p>
<p>You can also override the kafka dependency version while building from maven, with parameter <code>storm.kafka.version</code> and <code>storm.kafka.artifact.id</code>
e.g. <code>mvn clean install -Dstorm.kafka.artifact.id=kafka_2.11 -Dstorm.kafka.version=0.9.0.1</code></p>
<p>When selecting a kafka dependency version, you should ensure - </p>
<ol>
<li>kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka
module. If you want to use a higher version, storm-kafka-client module should be used instead.</li>
<li>The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with
0.8.x broker. </li>
</ol>
<h2 id="writing-to-kafka-as-part-of-your-topology">Writing to Kafka as part of your topology</h2>
<p>You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
org.apache.storm.kafka.trident.TridentKafkaUpdater.</p>
<p>You need to provide implementation of following 2 interfaces</p>
<h3 id="tupletokafkamapper-and-tridenttupletokafkamapper">TupleToKafkaMapper and TridentTupleToKafkaMapper</h3>
<p>These interfaces have 2 methods defined:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">K</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="n">V</span> <span class="nf">getMessageFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
</code></pre></div>
<p>As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field
as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java
implementation. In the KafkaBolt, the implementation always looks for a field with field name &quot;key&quot; and &quot;message&quot; if you
use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility
reasons. Alternatively you could also specify a different key and message field by using the non default constructor.
In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor.
These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.</p>
<h3 id="kafkatopicselector-and-trident-kafkatopicselector">KafkaTopicSelector and trident KafkaTopicSelector</h3>
<p>This interface has only one method</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">KafkaTopicSelector</span> <span class="o">{</span>
<span class="n">String</span> <span class="nf">getTopics</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span>
<span class="o">}</span>
</code></pre></div>
<p>The implementation of this interface should return the topic to which the tuple&#39;s key/message mapping needs to be published
You can return a null and the message will be ignored. If you have one static topic name then you can use
DefaultTopicSelector.java and set the name of the topic in the constructor.
<code>FieldNameTopicSelector</code> and <code>FieldIndexTopicSelector</code> use to support decided which topic should to push message from tuple.
User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message.
When the topic name not found , <code>KafkaBolt</code> will write messages into default topic .
Please make sure the default topic have created .</p>
<h3 id="specifying-kafka-producer-properties">Specifying Kafka producer properties</h3>
<p>You can provide all the produce properties in your Storm topology by calling <code>KafkaBolt.withProducerProperties()</code> and <code>TridentKafkaStateFactory.withProducerProperties()</code>. Please see <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">http://kafka.apache.org/documentation.html#newproducerconfigs</a>
Section &quot;Important configuration properties for the producer&quot; for more details.</p>
<h3 id="using-wildcard-kafka-topic-match">Using wildcard kafka topic match</h3>
<p>You can do a wildcard topic match by adding the following config</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"kafka.topic.wildcard.match"</span><span class="o">,</span><span class="kc">true</span><span class="o">);</span>
</code></pre></div>
<p>After this you can specify a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc</p>
<h3 id="putting-it-all-together">Putting it all together</h3>
<p>For the bolt :</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span>
<span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"message"</span><span class="o">);</span>
<span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span>
<span class="c1">//set producer properties.</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">KafkaBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaBolt</span><span class="o">()</span>
<span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span>
<span class="o">.</span><span class="na">withTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span>
<span class="o">.</span><span class="na">withTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">());</span>
<span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"forwardToKafka"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">,</span> <span class="mi">8</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaboltTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span>
</code></pre></div>
<p>For Trident:</p>
<div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">);</span>
<span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span>
<span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span>
<span class="o">);</span>
<span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span>
<span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span>
<span class="n">Stream</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span>
<span class="c1">//set producer properties.</span>
<span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span>
<span class="n">TridentKafkaStateFactory</span> <span class="n">stateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentKafkaStateFactory</span><span class="o">()</span>
<span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span>
<span class="o">.</span><span class="na">withKafkaTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span>
<span class="o">.</span><span class="na">withTridentTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span>
<span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">stateFactory</span><span class="o">,</span> <span class="n">fields</span><span class="o">,</span> <span class="k">new</span> <span class="n">TridentKafkaUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">());</span>
<span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span>
<span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaTridentTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">.</span><span class="na">build</span><span class="o">());</span>
</code></pre></div>
<h2 id="committer-sponsors">Committer Sponsors</h2>
<ul>
<li>P. Taylor Goetz (<a href="mailto:ptgoetz@apache.org">ptgoetz@apache.org</a>)</li>
</ul>
</div>
</div>
</div>
</div>
<footer>
<div class="container-fluid">
<div class="row">
<div class="col-md-3">
<div class="footer-widget">
<h5>Meetups</h5>
<ul class="latest-news">
<li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li>
<li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li>
<li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li>
<li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li>
<li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li>
<li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li>
<!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> -->
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>About Apache Storm</h5>
<p>Apache Storm integrates with any queueing system and any database system. Apache Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Apache Storm with database systems is easy.</p>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>First Look</h5>
<ul class="footer-list">
<li><a href="/releases/current/Rationale.html">Rationale</a></li>
<li><a href="/releases/current/Tutorial.html">Tutorial</a></li>
<li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li>
<li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Apache Storm project</a></li>
</ul>
</div>
</div>
<div class="col-md-3">
<div class="footer-widget">
<h5>Documentation</h5>
<ul class="footer-list">
<li><a href="/releases/current/index.html">Index</a></li>
<li><a href="/releases/current/javadocs/index.html">Javadoc</a></li>
<li><a href="/releases/current/FAQ.html">FAQ</a></li>
</ul>
</div>
</div>
</div>
<hr/>
<div class="row">
<div class="col-md-12">
<p align="center">Copyright © 2019 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved.
<br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation.
<br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p>
</div>
</div>
</div>
</footer>
<!--Footer End-->
<!-- Scroll to top -->
<span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span>
</body>
</html>