| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm Kafka Integration (0.8.x)</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-5"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-5"> |
| |
| <h1>Version: 1.2.1</h1> |
| |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="documentation">Documentation <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| |
| |
| <li><a href="/releases/2.0.0-SNAPSHOT/index.html">2.0.0-SNAPSHOT</a></li> |
| |
| |
| |
| <li><a href="/releases/1.2.1/index.html">1.2.1</a></li> |
| |
| |
| |
| <li><a href="/releases/1.1.2/index.html">1.1.2</a></li> |
| |
| |
| |
| |
| |
| <li><a href="/releases/1.0.6/index.html">1.0.6</a></li> |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| </ul> |
| </li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2018/06/04/storm122-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm Kafka Integration (0.8.x)</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <div class="documentation-content"><p>Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.</p> |
| |
| <h2 id="spouts">Spouts</h2> |
| |
| <p>We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that |
| tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.</p> |
| |
| <h3 id="brokerhosts">BrokerHosts</h3> |
| |
| <p>In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. |
| Currently, we support the following two implementations:</p> |
| |
| <h4 id="zkhosts">ZkHosts</h4> |
| |
| <p>ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses |
| Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">String</span> <span class="n">brokerZkStr</span><span class="o">,</span> <span class="n">String</span> <span class="n">brokerZkPath</span><span class="o">)</span> |
| <span class="kd">public</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">String</span> <span class="n">brokerZkStr</span><span class="o">)</span> |
| </code></pre></div> |
| <p>Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and |
| partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.</p> |
| |
| <p>By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you |
| should set host.refreshFreqSecs to your chosen value.</p> |
| |
| <h4 id="statichosts">StaticHosts</h4> |
| |
| <p>This is an alternative implementation where broker -> partition information is static. In order to construct an instance |
| of this class, you need to first construct an instance of GlobalPartitionInformation.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Broker</span> <span class="n">brokerForPartition0</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span><span class="c1">//localhost:9092</span> |
| <span class="n">Broker</span> <span class="n">brokerForPartition1</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9092</span><span class="o">);</span><span class="c1">//localhost:9092 but we specified the port explicitly</span> |
| <span class="n">Broker</span> <span class="n">brokerForPartition2</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Broker</span><span class="o">(</span><span class="s">"localhost:9092"</span><span class="o">);</span><span class="c1">//localhost:9092 specified as one string.</span> |
| <span class="n">GlobalPartitionInformation</span> <span class="n">partitionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="n">GlobalPartitionInformation</span><span class="o">();</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">brokerForPartition0</span><span class="o">);</span><span class="c1">//mapping from partition 0 to brokerForPartition0</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">brokerForPartition1</span><span class="o">);</span><span class="c1">//mapping from partition 1 to brokerForPartition1</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="n">brokerForPartition2</span><span class="o">);</span><span class="c1">//mapping from partition 2 to brokerForPartition2</span> |
| <span class="n">StaticHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StaticHosts</span><span class="o">(</span><span class="n">partitionInfo</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="kafkaconfig">KafkaConfig</h3> |
| |
| <p>The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">KafkaConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">)</span> |
| <span class="kd">public</span> <span class="nf">KafkaConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">clientId</span><span class="o">)</span> |
| </code></pre></div> |
| <p>The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic. |
| The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.</p> |
| |
| <p>There are 2 extensions of KafkaConfig currently in use.</p> |
| |
| <p>SpoutConfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling |
| behavior specific to KafkaSpout. |
| The clientId will be used to identify requests which are made using the Kafka Protocol. |
| The zkRoot will be used as root to store your consumer's offset. |
| The id should uniquely identify your spout.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">clientId</span><span class="o">,</span> <span class="n">String</span> <span class="n">zkRoot</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span> |
| <span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">zkRoot</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span> |
| <span class="kd">public</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">BrokerHosts</span> <span class="n">hosts</span><span class="o">,</span> <span class="n">String</span> <span class="n">topic</span><span class="o">,</span> <span class="n">String</span> <span class="n">id</span><span class="o">);</span> |
| </code></pre></div> |
| <p>In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="c1">// setting for how often to save the current Kafka offset to ZooKeeper</span> |
| <span class="kd">public</span> <span class="kt">long</span> <span class="n">stateUpdateIntervalMs</span> <span class="o">=</span> <span class="mi">2000</span><span class="o">;</span> |
| |
| <span class="c1">// Retry strategy for failed messages</span> |
| <span class="kd">public</span> <span class="n">String</span> <span class="n">failedMsgRetryManagerClass</span> <span class="o">=</span> <span class="n">ExponentialBackoffMsgRetryManager</span><span class="o">.</span><span class="na">class</span><span class="o">.</span><span class="na">getName</span><span class="o">();</span> |
| |
| <span class="c1">// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt</span> |
| <span class="c1">// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.</span> |
| <span class="c1">// Initial delay between successive retries</span> |
| <span class="kd">public</span> <span class="kt">long</span> <span class="n">retryInitialDelayMs</span> <span class="o">=</span> <span class="mi">0</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">double</span> <span class="n">retryDelayMultiplier</span> <span class="o">=</span> <span class="mf">1.0</span><span class="o">;</span> |
| |
| <span class="c1">// Maximum delay between successive retries </span> |
| <span class="kd">public</span> <span class="kt">long</span> <span class="n">retryDelayMaxMs</span> <span class="o">=</span> <span class="mi">60</span> <span class="o">*</span> <span class="mi">1000</span><span class="o">;</span> |
| <span class="c1">// Failed message will be retried infinitely if retryLimit is less than zero. </span> |
| <span class="kd">public</span> <span class="kt">int</span> <span class="n">retryLimit</span> <span class="o">=</span> <span class="o">-</span><span class="mi">1</span><span class="o">;</span> |
| </code></pre></div> |
| <p>Core KafkaSpout only accepts an instance of SpoutConfig.</p> |
| |
| <p>TridentKafkaConfig is another extension of KafkaConfig. |
| TridentKafkaEmitter only accepts TridentKafkaConfig.</p> |
| |
| <p>The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kt">int</span> <span class="n">fetchSizeBytes</span> <span class="o">=</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">int</span> <span class="n">socketTimeoutMs</span> <span class="o">=</span> <span class="mi">10000</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">int</span> <span class="n">fetchMaxWait</span> <span class="o">=</span> <span class="mi">10000</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">int</span> <span class="n">bufferSizeBytes</span> <span class="o">=</span> <span class="mi">1024</span> <span class="o">*</span> <span class="mi">1024</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="n">MultiScheme</span> <span class="n">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">RawMultiScheme</span><span class="o">();</span> |
| <span class="kd">public</span> <span class="kt">boolean</span> <span class="n">ignoreZkOffsets</span> <span class="o">=</span> <span class="kc">false</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">long</span> <span class="n">startOffsetTime</span> <span class="o">=</span> <span class="n">kafka</span><span class="o">.</span><span class="na">api</span><span class="o">.</span><span class="na">OffsetRequest</span><span class="o">.</span><span class="na">EarliestTime</span><span class="o">();</span> |
| <span class="kd">public</span> <span class="kt">long</span> <span class="n">maxOffsetBehind</span> <span class="o">=</span> <span class="n">Long</span><span class="o">.</span><span class="na">MAX_VALUE</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">boolean</span> <span class="n">useStartOffsetTimeIfOffsetOutOfRange</span> <span class="o">=</span> <span class="kc">true</span><span class="o">;</span> |
| <span class="kd">public</span> <span class="kt">int</span> <span class="n">metricsTimeBucketSizeInSecs</span> <span class="o">=</span> <span class="mi">60</span><span class="o">;</span> |
| </code></pre></div> |
| <p>Most of them are self explanatory except MultiScheme.</p> |
| |
| <h3 id="multischeme">MultiScheme</h3> |
| |
| <p>MultiScheme is an interface that dictates how the ByteBuffer consumed from Kafka gets transformed into a storm tuple. It |
| also controls the naming of your output field.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="nf">deserialize</span><span class="o">(</span><span class="n">ByteBuffer</span> <span class="n">ser</span><span class="o">);</span> |
| <span class="kd">public</span> <span class="n">Fields</span> <span class="nf">getOutputFields</span><span class="o">();</span> |
| </code></pre></div> |
| <p>The default <code>RawMultiScheme</code> just takes the <code>ByteBuffer</code> and returns a tuple with the ByteBuffer converted to a <code>byte[]</code>. The name of the outputField is "bytes". There are alternative implementations like <code>SchemeAsMultiScheme</code> and <code>KeyValueSchemeAsMultiScheme</code> which can convert the <code>ByteBuffer</code> to <code>String</code>.</p> |
| |
| <p>There is also an extension of <code>SchemeAsMultiScheme</code>, <code>MessageMetadataSchemeAsMultiScheme</code>, |
| which has an additional deserialize method that accepts the message <code>ByteBuffer</code> in addition to the <code>Partition</code> and <code>offset</code> associated with the message.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="nf">deserializeMessageWithMetadata</span><span class="o">(</span><span class="n">ByteBuffer</span> <span class="n">message</span><span class="o">,</span> <span class="n">Partition</span> <span class="n">partition</span><span class="o">,</span> <span class="kt">long</span> <span class="n">offset</span><span class="o">)</span> |
| </code></pre></div> |
| <p>This is useful for auditing/replaying messages from arbitrary points on a Kafka topic, saving the partition and offset of each message of a discrete stream instead of persisting the entire message.</p> |
| |
| <h3 id="failed-message-retry">Failed message retry</h3> |
| |
| <p>FailedMsgRetryManager is an interface which defines the retry strategy for a failed message. Default implementation is ExponentialBackoffMsgRetryManager which retries with exponential delays |
| between consecutive retries. To use a custom implementation, set SpoutConfig.failedMsgRetryManagerClass to the full classname |
| of implementation. Here is the interface </p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="c1">// Spout initialization can go here. This can be called multiple times during lifecycle of a worker. </span> |
| <span class="kt">void</span> <span class="nf">prepare</span><span class="o">(</span><span class="n">SpoutConfig</span> <span class="n">spoutConfig</span><span class="o">,</span> <span class="n">Map</span> <span class="n">stormConf</span><span class="o">);</span> |
| |
| <span class="c1">// Message corresponding to offset has failed. This method is called only if retryFurther returns true for offset.</span> |
| <span class="kt">void</span> <span class="nf">failed</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span> |
| |
| <span class="c1">// Message corresponding to offset has been acked. </span> |
| <span class="kt">void</span> <span class="nf">acked</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span> |
| |
| <span class="c1">// Message corresponding to the offset, has been re-emitted and under transit.</span> |
| <span class="kt">void</span> <span class="nf">retryStarted</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span> |
| |
| <span class="cm">/** |
| * The offset of message, which is to be re-emitted. Spout will fetch messages starting from this offset |
| * and resend them, except completed messages. |
| */</span> |
| <span class="n">Long</span> <span class="nf">nextFailedMessageToRetry</span><span class="o">();</span> |
| |
| <span class="cm">/** |
| * @return True if the message corresponding to the offset should be emitted NOW. False otherwise. |
| */</span> |
| <span class="kt">boolean</span> <span class="nf">shouldReEmitMsg</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span> |
| |
| <span class="cm">/** |
| * Spout will clean up the state for this offset if false is returned. If retryFurther is set to true, |
| * spout will called failed(offset) in next call and acked(offset) otherwise |
| */</span> |
| <span class="kt">boolean</span> <span class="nf">retryFurther</span><span class="o">(</span><span class="n">Long</span> <span class="n">offset</span><span class="o">);</span> |
| |
| <span class="cm">/** |
| * Clear any offsets before kafkaOffset. These offsets are no longer available in kafka. |
| */</span> |
| <span class="n">Set</span><span class="o"><</span><span class="n">Long</span><span class="o">></span> <span class="nf">clearOffsetsBefore</span><span class="o">(</span><span class="n">Long</span> <span class="n">kafkaOffset</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="version-incompatibility">Version incompatibility</h4> |
| |
| <p>In Storm versions prior to 1.0, the MultiScheme methods accepted a <code>byte[]</code> instead of <code>ByteBuffer</code>. The <code>MultScheme</code> and the related |
| Scheme apis were changed in version 1.0 to accept a ByteBuffer instead of a byte[].</p> |
| |
| <p>This means that pre 1.0 kafka spouts will not work with Storm versions 1.0 and higher. While running topologies in Storm version 1.0 |
| and higher, it must be ensured that the storm-kafka version is at least 1.0. Pre 1.0 shaded topology jars that bundles |
| storm-kafka classes must be rebuilt with storm-kafka version 1.0 for running in clusters with storm 1.0 and higher.</p> |
| |
| <h3 id="examples">Examples</h3> |
| |
| <h4 id="core-spout">Core Spout</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">BrokerHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkHosts</span><span class="o">(</span><span class="n">zkConnString</span><span class="o">);</span> |
| <span class="n">SpoutConfig</span> <span class="n">spoutConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SpoutConfig</span><span class="o">(</span><span class="n">hosts</span><span class="o">,</span> <span class="n">topicName</span><span class="o">,</span> <span class="s">"/"</span> <span class="o">+</span> <span class="n">topicName</span><span class="o">,</span> <span class="n">UUID</span><span class="o">.</span><span class="na">randomUUID</span><span class="o">().</span><span class="na">toString</span><span class="o">());</span> |
| <span class="n">spoutConfig</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="n">StringScheme</span><span class="o">());</span> |
| <span class="n">KafkaSpout</span> <span class="n">kafkaSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaSpout</span><span class="o">(</span><span class="n">spoutConfig</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="trident-spout">Trident Spout</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span> |
| <span class="n">BrokerHosts</span> <span class="n">zk</span> <span class="o">=</span> <span class="k">new</span> <span class="n">ZkHosts</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span> |
| <span class="n">TridentKafkaConfig</span> <span class="n">spoutConf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentKafkaConfig</span><span class="o">(</span><span class="n">zk</span><span class="o">,</span> <span class="s">"test-topic"</span><span class="o">);</span> |
| <span class="n">spoutConf</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="n">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="n">StringScheme</span><span class="o">());</span> |
| <span class="n">OpaqueTridentKafkaSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">OpaqueTridentKafkaSpout</span><span class="o">(</span><span class="n">spoutConf</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="how-kafkaspout-stores-offsets-of-a-kafka-topic-and-recovers-in-case-of-failures">How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures</h3> |
| |
| <p>As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by |
| setting <code>KafkaConfig.startOffsetTime</code> as follows:</p> |
| |
| <ol> |
| <li><code>kafka.api.OffsetRequest.EarliestTime()</code>: read from the beginning of the topic (i.e. from the oldest messages onwards)</li> |
| <li><code>kafka.api.OffsetRequest.LatestTime()</code>: read from the end of the topic (i.e. any new messsages that are being written to the topic)</li> |
| <li>A Unix timestamp aka seconds since the epoch (e.g. via <code>System.currentTimeMillis()</code>): |
| see <a href="https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?">How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?</a> in the Kafka FAQ</li> |
| </ol> |
| |
| <p>As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information |
| under the ZooKeeper path <code>SpoutConfig.zkRoot+ "/" + SpoutConfig.id</code>. In the case of failures it recovers from the last |
| written offset in ZooKeeper.</p> |
| |
| <blockquote> |
| <p><strong>Important:</strong> When re-deploying a topology make sure that the settings for <code>SpoutConfig.zkRoot</code> and <code>SpoutConfig.id</code> |
| were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the |
| offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.</p> |
| </blockquote> |
| |
| <p>This means that when a topology has run once the setting <code>KafkaConfig.startOffsetTime</code> will not have an effect for |
| subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in |
| ZooKeeper to determine from where it should begin (more precisely: resume) reading. |
| If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should |
| set the parameter <code>KafkaConfig.ignoreZkOffsets</code> to <code>true</code>. If <code>true</code>, the spout will always begin reading from the |
| offset defined by <code>KafkaConfig.startOffsetTime</code> as described above.</p> |
| |
| <h2 id="using-storm-kafka-with-different-versions-of-kafka">Using storm-kafka with different versions of Kafka</h2> |
| |
| <p>Storm-kafka's Kafka dependency is defined as <code>provided</code> scope in maven, meaning it will not be pulled in |
| as a transitive dependency. This allows you to use a version of Kafka dependency-compatible with your Kafka cluster.</p> |
| |
| <p>When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to |
| use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your <code>pom.xml</code>:</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"><span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.kafka<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>kafka_2.10<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>0.8.1.1<span class="nt"></version></span> |
| <span class="nt"><exclusions></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>org.apache.zookeeper<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>zookeeper<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>log4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>log4j<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"></exclusions></span> |
| <span class="nt"></dependency></span> |
| </code></pre></div> |
| <p>Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.</p> |
| |
| <p>You can also override the kafka dependency version while building from maven, with parameter <code>storm.kafka.version</code> and <code>storm.kafka.artifact.id</code> |
| e.g. <code>mvn clean install -Dstorm.kafka.artifact.id=kafka_2.11 -Dstorm.kafka.version=0.9.0.1</code></p> |
| |
| <p>When selecting a kafka dependency version, you should ensure - </p> |
| |
| <ol> |
| <li>kafka api is compatible with storm-kafka. Currently, only 0.9.x and 0.8.x client API is supported by storm-kafka |
| module. If you want to use a higher version, storm-kafka-client module should be used instead.</li> |
| <li>The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with |
| 0.8.x broker. </li> |
| </ol> |
| |
| <h2 id="writing-to-kafka-as-part-of-your-topology">Writing to Kafka as part of your topology</h2> |
| |
| <p>You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you |
| are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and |
| org.apache.storm.kafka.trident.TridentKafkaUpdater.</p> |
| |
| <p>You need to provide implementation of following 2 interfaces</p> |
| |
| <h3 id="tupletokafkamapper-and-tridenttupletokafkamapper">TupleToKafkaMapper and TridentTupleToKafkaMapper</h3> |
| |
| <p>These interfaces have 2 methods defined:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">K</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="n">V</span> <span class="nf">getMessageFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> |
| </code></pre></div> |
| <p>As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field |
| as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java |
| implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you |
| use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility |
| reasons. Alternatively you could also specify a different key and message field by using the non default constructor. |
| In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. |
| These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.</p> |
| |
| <h3 id="kafkatopicselector-and-trident-kafkatopicselector">KafkaTopicSelector and trident KafkaTopicSelector</h3> |
| |
| <p>This interface has only one method</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="kd">public</span> <span class="kd">interface</span> <span class="nc">KafkaTopicSelector</span> <span class="o">{</span> |
| <span class="n">String</span> <span class="nf">getTopics</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="o">}</span> |
| </code></pre></div> |
| <p>The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published |
| You can return a null and the message will be ignored. If you have one static topic name then you can use |
| DefaultTopicSelector.java and set the name of the topic in the constructor. |
| <code>FieldNameTopicSelector</code> and <code>FieldIndexTopicSelector</code> use to support decided which topic should to push message from tuple. |
| User could specify the field name or field index in tuple ,selector will use this value as topic name which to publish message. |
| When the topic name not found , <code>KafkaBolt</code> will write messages into default topic . |
| Please make sure the default topic have created .</p> |
| |
| <h3 id="specifying-kafka-producer-properties">Specifying Kafka producer properties</h3> |
| |
| <p>You can provide all the produce properties in your Storm topology by calling <code>KafkaBolt.withProducerProperties()</code> and <code>TridentKafkaStateFactory.withProducerProperties()</code>. Please see <a href="http://kafka.apache.org/documentation.html#newproducerconfigs">http://kafka.apache.org/documentation.html#newproducerconfigs</a> |
| Section "Important configuration properties for the producer" for more details.</p> |
| |
| <h3 id="using-wildcard-kafka-topic-match">Using wildcard kafka topic match</h3> |
| |
| <p>You can do a wildcard topic match by adding the following config</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Config</span> <span class="n">config</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> |
| <span class="n">config</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"kafka.topic.wildcard.match"</span><span class="o">,</span><span class="kc">true</span><span class="o">);</span> |
| </code></pre></div> |
| <p>After this you can specify a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc</p> |
| |
| <h3 id="putting-it-all-together">Putting it all together</h3> |
| |
| <p>For the bolt :</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TopologyBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TopologyBuilder</span><span class="o">();</span> |
| |
| <span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"key"</span><span class="o">,</span> <span class="s">"message"</span><span class="o">);</span> |
| <span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span> |
| <span class="o">);</span> |
| <span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setSpout</span><span class="o">(</span><span class="s">"spout"</span><span class="o">,</span> <span class="n">spout</span><span class="o">,</span> <span class="mi">5</span><span class="o">);</span> |
| <span class="c1">//set producer properties.</span> |
| <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span> |
| |
| <span class="n">KafkaBolt</span> <span class="n">bolt</span> <span class="o">=</span> <span class="k">new</span> <span class="n">KafkaBolt</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">withTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">());</span> |
| <span class="n">builder</span><span class="o">.</span><span class="na">setBolt</span><span class="o">(</span><span class="s">"forwardToKafka"</span><span class="o">,</span> <span class="n">bolt</span><span class="o">,</span> <span class="mi">8</span><span class="o">).</span><span class="na">shuffleGrouping</span><span class="o">(</span><span class="s">"spout"</span><span class="o">);</span> |
| |
| <span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> |
| |
| <span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaboltTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">builder</span><span class="o">.</span><span class="na">createTopology</span><span class="o">());</span> |
| </code></pre></div> |
| <p>For Trident:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">Fields</span> <span class="n">fields</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">);</span> |
| <span class="n">FixedBatchSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="n">FixedBatchSpout</span><span class="o">(</span><span class="n">fields</span><span class="o">,</span> <span class="mi">4</span><span class="o">,</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"storm"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"trident"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"needs"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">),</span> |
| <span class="k">new</span> <span class="nf">Values</span><span class="o">(</span><span class="s">"javadoc"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">)</span> |
| <span class="o">);</span> |
| <span class="n">spout</span><span class="o">.</span><span class="na">setCycle</span><span class="o">(</span><span class="kc">true</span><span class="o">);</span> |
| |
| <span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentTopology</span><span class="o">();</span> |
| <span class="n">Stream</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">topology</span><span class="o">.</span><span class="na">newStream</span><span class="o">(</span><span class="s">"spout1"</span><span class="o">,</span> <span class="n">spout</span><span class="o">);</span> |
| |
| <span class="c1">//set producer properties.</span> |
| <span class="n">Properties</span> <span class="n">props</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Properties</span><span class="o">();</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"bootstrap.servers"</span><span class="o">,</span> <span class="s">"localhost:9092"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"acks"</span><span class="o">,</span> <span class="s">"1"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"key.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span> |
| <span class="n">props</span><span class="o">.</span><span class="na">put</span><span class="o">(</span><span class="s">"value.serializer"</span><span class="o">,</span> <span class="s">"org.apache.kafka.common.serialization.StringSerializer"</span><span class="o">);</span> |
| |
| <span class="n">TridentKafkaStateFactory</span> <span class="n">stateFactory</span> <span class="o">=</span> <span class="k">new</span> <span class="n">TridentKafkaStateFactory</span><span class="o">()</span> |
| <span class="o">.</span><span class="na">withProducerProperties</span><span class="o">(</span><span class="n">props</span><span class="o">)</span> |
| <span class="o">.</span><span class="na">withKafkaTopicSelector</span><span class="o">(</span><span class="k">new</span> <span class="n">DefaultTopicSelector</span><span class="o">(</span><span class="s">"test"</span><span class="o">))</span> |
| <span class="o">.</span><span class="na">withTridentTupleToKafkaMapper</span><span class="o">(</span><span class="k">new</span> <span class="n">FieldNameBasedTupleToKafkaMapper</span><span class="o">(</span><span class="s">"word"</span><span class="o">,</span> <span class="s">"count"</span><span class="o">));</span> |
| <span class="n">stream</span><span class="o">.</span><span class="na">partitionPersist</span><span class="o">(</span><span class="n">stateFactory</span><span class="o">,</span> <span class="n">fields</span><span class="o">,</span> <span class="k">new</span> <span class="n">TridentKafkaUpdater</span><span class="o">(),</span> <span class="k">new</span> <span class="n">Fields</span><span class="o">());</span> |
| |
| <span class="n">Config</span> <span class="n">conf</span> <span class="o">=</span> <span class="k">new</span> <span class="n">Config</span><span class="o">();</span> |
| <span class="n">StormSubmitter</span><span class="o">.</span><span class="na">submitTopology</span><span class="o">(</span><span class="s">"kafkaTridentTest"</span><span class="o">,</span> <span class="n">conf</span><span class="o">,</span> <span class="n">topology</span><span class="o">.</span><span class="na">build</span><span class="o">());</span> |
| </code></pre></div> |
| <h2 id="committer-sponsors">Committer Sponsors</h2> |
| |
| <ul> |
| <li>P. Taylor Goetz (<a href="mailto:ptgoetz@apache.org">ptgoetz@apache.org</a>)</li> |
| </ul> |
| </div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Storm</h5> |
| <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/Rationale.html">Rationale</a></li> |
| <li><a href="/releases/current/Tutorial.html">Tutorial</a></li> |
| <li><a href="/releases/current/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/releases/current/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/releases/current/index.html">Index</a></li> |
| <li><a href="/releases/current/javadocs/index.html">Javadoc</a></li> |
| <li><a href="/releases/current/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |