| <!DOCTYPE html> |
| <html> |
| <head> |
| <meta charset="utf-8"> |
| <meta http-equiv="X-UA-Compatible" content="IE=edge"> |
| <meta name="viewport" content="width=device-width, initial-scale=1"> |
| |
| <link rel="shortcut icon" href="/favicon.ico" type="image/x-icon"> |
| <link rel="icon" href="/favicon.ico" type="image/x-icon"> |
| |
| <title>Storm Kafka Integration</title> |
| |
| <!-- Bootstrap core CSS --> |
| <link href="/assets/css/bootstrap.min.css" rel="stylesheet"> |
| <!-- Bootstrap theme --> |
| <link href="/assets/css/bootstrap-theme.min.css" rel="stylesheet"> |
| |
| <!-- Custom styles for this template --> |
| <link rel="stylesheet" href="http://fortawesome.github.io/Font-Awesome/assets/font-awesome/css/font-awesome.css"> |
| <link href="/css/style.css" rel="stylesheet"> |
| <link href="/assets/css/owl.theme.css" rel="stylesheet"> |
| <link href="/assets/css/owl.carousel.css" rel="stylesheet"> |
| <script type="text/javascript" src="/assets/js/jquery.min.js"></script> |
| <script type="text/javascript" src="/assets/js/bootstrap.min.js"></script> |
| <script type="text/javascript" src="/assets/js/owl.carousel.min.js"></script> |
| <script type="text/javascript" src="/assets/js/storm.js"></script> |
| <!-- Just for debugging purposes. Don't actually copy these 2 lines! --> |
| <!--[if lt IE 9]><script src="../../assets/js/ie8-responsive-file-warning.js"></script><![endif]--> |
| |
| <!-- HTML5 shim and Respond.js for IE8 support of HTML5 elements and media queries --> |
| <!--[if lt IE 9]> |
| <script src="https://oss.maxcdn.com/html5shiv/3.7.2/html5shiv.min.js"></script> |
| <script src="https://oss.maxcdn.com/respond/1.4.2/respond.min.js"></script> |
| <![endif]--> |
| </head> |
| |
| |
| <body> |
| <header> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-10"> |
| <a href="/index.html"><img src="/images/logo.png" class="logo" /></a> |
| </div> |
| <div class="col-md-2"> |
| <a href="/downloads.html" class="btn-std btn-block btn-download">Download</a> |
| </div> |
| </div> |
| </div> |
| </header> |
| <!--Header End--> |
| <!--Navigation Begin--> |
| <div class="navbar" role="banner"> |
| <div class="container-fluid"> |
| <div class="navbar-header"> |
| <button class="navbar-toggle" type="button" data-toggle="collapse" data-target=".bs-navbar-collapse"> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| <span class="icon-bar"></span> |
| </button> |
| </div> |
| <nav class="collapse navbar-collapse bs-navbar-collapse" role="navigation"> |
| <ul class="nav navbar-nav"> |
| <li><a href="/index.html" id="home">Home</a></li> |
| <li><a href="/getting-help.html" id="getting-help">Getting Help</a></li> |
| <li><a href="/about/integrates.html" id="project-info">Project Information</a></li> |
| <li><a href="/documentation.html" id="documentation">Documentation</a></li> |
| <li><a href="/talksAndVideos.html">Talks and Slideshows</a></li> |
| <li class="dropdown"> |
| <a href="#" class="dropdown-toggle" data-toggle="dropdown" id="contribute">Community <b class="caret"></b></a> |
| <ul class="dropdown-menu"> |
| <li><a href="/contribute/Contributing-to-Storm.html">Contributing</a></li> |
| <li><a href="/contribute/People.html">People</a></li> |
| <li><a href="/contribute/BYLAWS.html">ByLaws</a></li> |
| </ul> |
| </li> |
| <li><a href="/2015/06/15/storm0100-beta-released.html" id="news">News</a></li> |
| </ul> |
| </nav> |
| </div> |
| </div> |
| |
| |
| |
| <div class="container-fluid"> |
| <h1 class="page-title">Storm Kafka Integration</h1> |
| <div class="row"> |
| <div class="col-md-12"> |
| <!-- Documentation --> |
| |
| <p class="post-meta"></p> |
| |
| <p>Provides core Storm and Trident spout implementations for consuming data from Apache Kafka 0.8.x.</p> |
| |
| <h2 id="spouts">Spouts</h2> |
| |
| <p>We support both Trident and core Storm spouts. For both spout implementations, we use a BrokerHost interface that |
| tracks Kafka broker host to partition mapping and kafkaConfig that controls some Kafka related parameters.</p> |
| |
| <h3 id="brokerhosts">BrokerHosts</h3> |
| |
| <p>In order to initialize your Kafka spout/emitter you need to construct an instance of the marker interface BrokerHosts. |
| Currently, we support the following two implementations:</p> |
| |
| <h4 id="zkhosts">ZkHosts</h4> |
| |
| <p>ZkHosts is what you should use if you want to dynamically track Kafka broker to partition mapping. This class uses |
| Kafka's ZooKeeper entries to track brokerHost -> partition mapping. You can instantiate an object by calling |
| <code>java |
| public ZkHosts(String brokerZkStr, String brokerZkPath) |
| public ZkHosts(String brokerZkStr) |
| </code> |
| Where brokerZkStr is just ip:port (e.g. localhost:2181). brokerZkPath is the root directory under which all the topics and |
| partition information is stored. By default this is /brokers which is what the default Kafka implementation uses.</p> |
| |
| <p>By default, the broker-partition mapping is refreshed every 60 seconds from ZooKeeper. If you want to change it, you |
| should set host.refreshFreqSecs to your chosen value.</p> |
| |
| <h4 id="statichosts">StaticHosts</h4> |
| |
| <p>This is an alternative implementation where broker -> partition information is static. In order to construct an instance |
| of this class, you need to first construct an instance of GlobalPartitionInformation.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">Broker</span> <span class="n">brokerForPartition0</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span><span class="c1">//localhost:9092</span> |
| <span class="n">Broker</span> <span class="n">brokerForPartition1</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">,</span> <span class="mi">9092</span><span class="o">);</span><span class="c1">//localhost:9092 but we specified the port explicitly</span> |
| <span class="n">Broker</span> <span class="n">brokerForPartition2</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">Broker</span><span class="o">(</span><span class="s">"localhost:9092"</span><span class="o">);</span><span class="c1">//localhost:9092 specified as one string.</span> |
| <span class="n">GlobalPartitionInformation</span> <span class="n">partitionInfo</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">GlobalPartitionInformation</span><span class="o">();</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">brokerForPartition0</span><span class="o">);</span><span class="c1">//mapping from partition 0 to brokerForPartition0</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">1</span><span class="o">,</span> <span class="n">brokerForPartition1</span><span class="o">);</span><span class="c1">//mapping from partition 1 to brokerForPartition1</span> |
| <span class="n">partitionInfo</span><span class="o">.</span><span class="na">addPartition</span><span class="o">(</span><span class="mi">2</span><span class="o">,</span> <span class="n">brokerForPartition2</span><span class="o">);</span><span class="c1">//mapping from partition 2 to brokerForPartition2</span> |
| <span class="n">StaticHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">StaticHosts</span><span class="o">(</span><span class="n">partitionInfo</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="kafkaconfig">KafkaConfig</h3> |
| |
| <p>The second thing needed for constructing a kafkaSpout is an instance of KafkaConfig. |
| <code>java |
| public KafkaConfig(BrokerHosts hosts, String topic) |
| public KafkaConfig(BrokerHosts hosts, String topic, String clientId) |
| </code></p> |
| |
| <p>The BrokerHosts can be any implementation of BrokerHosts interface as described above. The topic is name of Kafka topic. |
| The optional ClientId is used as a part of the ZooKeeper path where the spout's current consumption offset is stored.</p> |
| |
| <p>There are 2 extensions of KafkaConfig currently in use.</p> |
| |
| <p>Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling |
| behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely |
| identify your spout. |
| <code>java |
| public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); |
| public SpoutConfig(BrokerHosts hosts, String topic, String id); |
| </code> |
| In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: |
| ```java |
| // setting for how often to save the current Kafka offset to ZooKeeper |
| public long stateUpdateIntervalMs = 2000;</p> |
| <div class="highlight"><pre><code class="language-text" data-lang="text">// Exponential back-off retry settings. These are used when retrying messages after a bolt |
| // calls OutputCollector.fail(). |
| // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent |
| // resubmitting the message while still retrying. |
| public long retryInitialDelayMs = 0; |
| public double retryDelayMultiplier = 1.0; |
| public long retryDelayMaxMs = 60 * 1000; |
| |
| // if set to true, spout will set Kafka topic as the emitted Stream ID |
| public boolean topicAsStreamId = false; |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">Core KafkaSpout only accepts an instance of SpoutConfig. |
| |
| TridentKafkaConfig is another extension of KafkaConfig. |
| TridentKafkaEmitter only accepts TridentKafkaConfig. |
| |
| The KafkaConfig class also has bunch of public variables that controls your application's behavior. Here are defaults: |
| ```java |
| public int fetchSizeBytes = 1024 * 1024; |
| public int socketTimeoutMs = 10000; |
| public int fetchMaxWait = 10000; |
| public int bufferSizeBytes = 1024 * 1024; |
| public MultiScheme scheme = new RawMultiScheme(); |
| public boolean ignoreZkOffsets = false; |
| public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); |
| public long maxOffsetBehind = Long.MAX_VALUE; |
| public boolean useStartOffsetTimeIfOffsetOutOfRange = true; |
| public int metricsTimeBucketSizeInSecs = 60; |
| </code></pre></div> |
| <p>Most of them are self explanatory except MultiScheme.</p> |
| |
| <h3 id="multischeme">MultiScheme</h3> |
| |
| <p>MultiScheme is an interface that dictates how the byte[] consumed from Kafka gets transformed into a storm tuple. It |
| also controls the naming of your output field.</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="kd">public</span> <span class="n">Iterable</span><span class="o"><</span><span class="n">List</span><span class="o"><</span><span class="n">Object</span><span class="o">>></span> <span class="nf">deserialize</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">ser</span><span class="o">);</span> |
| <span class="kd">public</span> <span class="n">Fields</span> <span class="nf">getOutputFields</span><span class="o">();</span> |
| </code></pre></div> |
| <p>The default <code>RawMultiScheme</code> just takes the <code>byte[]</code> and returns a tuple with <code>byte[]</code> as is. The name of the |
| outputField is "bytes". There are alternative implementation like <code>SchemeAsMultiScheme</code> and |
| <code>KeyValueSchemeAsMultiScheme</code> which can convert the <code>byte[]</code> to <code>String</code>.</p> |
| |
| <h3 id="examples">Examples</h3> |
| |
| <h4 id="core-spout">Core Spout</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">BrokerHosts</span> <span class="n">hosts</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="n">zkConnString</span><span class="o">);</span> |
| <span class="n">SpoutConfig</span> <span class="n">spoutConfig</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SpoutConfig</span><span class="o">(</span><span class="n">hosts</span><span class="o">,</span> <span class="n">topicName</span><span class="o">,</span> <span class="s">"/"</span> <span class="o">+</span> <span class="n">topicName</span><span class="o">,</span> <span class="n">UUID</span><span class="o">.</span><span class="na">randomUUID</span><span class="o">().</span><span class="na">toString</span><span class="o">());</span> |
| <span class="n">spoutConfig</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="nf">StringScheme</span><span class="o">());</span> |
| <span class="n">KafkaSpout</span> <span class="n">kafkaSpout</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">KafkaSpout</span><span class="o">(</span><span class="n">spoutConfig</span><span class="o">);</span> |
| </code></pre></div> |
| <h4 id="trident-spout">Trident Spout</h4> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"><span class="n">TridentTopology</span> <span class="n">topology</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">TridentTopology</span><span class="o">();</span> |
| <span class="n">BrokerHosts</span> <span class="n">zk</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">ZkHosts</span><span class="o">(</span><span class="s">"localhost"</span><span class="o">);</span> |
| <span class="n">TridentKafkaConfig</span> <span class="n">spoutConf</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">TridentKafkaConfig</span><span class="o">(</span><span class="n">zk</span><span class="o">,</span> <span class="s">"test-topic"</span><span class="o">);</span> |
| <span class="n">spoutConf</span><span class="o">.</span><span class="na">scheme</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">SchemeAsMultiScheme</span><span class="o">(</span><span class="k">new</span> <span class="nf">StringScheme</span><span class="o">());</span> |
| <span class="n">OpaqueTridentKafkaSpout</span> <span class="n">spout</span> <span class="o">=</span> <span class="k">new</span> <span class="nf">OpaqueTridentKafkaSpout</span><span class="o">(</span><span class="n">spoutConf</span><span class="o">);</span> |
| </code></pre></div> |
| <h3 id="how-kafkaspout-stores-offsets-of-a-kafka-topic-and-recovers-in-case-of-failures">How KafkaSpout stores offsets of a Kafka topic and recovers in case of failures</h3> |
| |
| <p>As shown in the above KafkaConfig properties, you can control from where in the Kafka topic the spout begins to read by |
| setting <code>KafkaConfig.startOffsetTime</code> as follows:</p> |
| |
| <ol> |
| <li><code>kafka.api.OffsetRequest.EarliestTime()</code>: read from the beginning of the topic (i.e. from the oldest messages onwards)</li> |
| <li><code>kafka.api.OffsetRequest.LatestTime()</code>: read from the end of the topic (i.e. any new messsages that are being written to the topic)</li> |
| <li>A Unix timestamp aka seconds since the epoch (e.g. via <code>System.currentTimeMillis()</code>): |
| see <a href="https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-HowdoIaccuratelygetoffsetsofmessagesforacertaintimestampusingOffsetRequest?">How do I accurately get offsets of messages for a certain timestamp using OffsetRequest?</a> in the Kafka FAQ</li> |
| </ol> |
| |
| <p>As the topology runs the Kafka spout keeps track of the offsets it has read and emitted by storing state information |
| under the ZooKeeper path <code>SpoutConfig.zkRoot+ "/" + SpoutConfig.id</code>. In the case of failures it recovers from the last |
| written offset in ZooKeeper.</p> |
| |
| <blockquote> |
| <p><strong>Important:</strong> When re-deploying a topology make sure that the settings for <code>SpoutConfig.zkRoot</code> and <code>SpoutConfig.id</code> |
| were not modified, otherwise the spout will not be able to read its previous consumer state information (i.e. the |
| offsets) from ZooKeeper -- which may lead to unexpected behavior and/or to data loss, depending on your use case.</p> |
| </blockquote> |
| |
| <p>This means that when a topology has run once the setting <code>KafkaConfig.startOffsetTime</code> will not have an effect for |
| subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in |
| ZooKeeper to determine from where it should begin (more precisely: resume) reading. |
| If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should |
| set the parameter <code>KafkaConfig.ignoreZkOffsets</code> to <code>true</code>. If <code>true</code>, the spout will always begin reading from the |
| offset defined by <code>KafkaConfig.startOffsetTime</code> as described above.</p> |
| |
| <h2 id="using-storm-kafka-with-different-versions-of-scala">Using storm-kafka with different versions of Scala</h2> |
| |
| <p>Storm-kafka's Kafka dependency is defined as <code>provided</code> scope in maven, meaning it will not be pulled in |
| as a transitive dependency. This allows you to use a version of Kafka built against a specific Scala version.</p> |
| |
| <p>When building a project with storm-kafka, you must explicitly add the Kafka dependency. For example, to |
| use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependency in your <code>pom.xml</code>:</p> |
| <div class="highlight"><pre><code class="language-xml" data-lang="xml"> <span class="nt"><dependency></span> |
| <span class="nt"><groupId></span>org.apache.kafka<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>kafka_2.10<span class="nt"></artifactId></span> |
| <span class="nt"><version></span>0.8.1.1<span class="nt"></version></span> |
| <span class="nt"><exclusions></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>org.apache.zookeeper<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>zookeeper<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"><exclusion></span> |
| <span class="nt"><groupId></span>log4j<span class="nt"></groupId></span> |
| <span class="nt"><artifactId></span>log4j<span class="nt"></artifactId></span> |
| <span class="nt"></exclusion></span> |
| <span class="nt"></exclusions></span> |
| <span class="nt"></dependency></span> |
| </code></pre></div> |
| <p>Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.</p> |
| |
| <h2 id="writing-to-kafka-as-part-of-your-topology">Writing to Kafka as part of your topology</h2> |
| |
| <p>You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you |
| are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and |
| storm.kafka.trident.TridentKafkaUpdater.</p> |
| |
| <p>You need to provide implementation of following 2 interfaces</p> |
| |
| <h3 id="tupletokafkamapper-and-tridenttupletokafkamapper">TupleToKafkaMapper and TridentTupleToKafkaMapper</h3> |
| |
| <p>These interfaces have 2 methods defined:</p> |
| <div class="highlight"><pre><code class="language-java" data-lang="java"> <span class="n">K</span> <span class="nf">getKeyFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> |
| <span class="n">V</span> <span class="nf">getMessageFromTuple</span><span class="o">(</span><span class="n">Tuple</span><span class="o">/</span><span class="n">TridentTuple</span> <span class="n">tuple</span><span class="o">);</span> |
| </code></pre></div> |
| <p>As the name suggests, these methods are called to map a tuple to Kafka key and Kafka message. If you just want one field |
| as key and one field as value, then you can use the provided FieldNameBasedTupleToKafkaMapper.java |
| implementation. In the KafkaBolt, the implementation always looks for a field with field name "key" and "message" if you |
| use the default constructor to construct FieldNameBasedTupleToKafkaMapper for backward compatibility |
| reasons. Alternatively you could also specify a different key and message field by using the non default constructor. |
| In the TridentKafkaState you must specify what is the field name for key and message as there is no default constructor. |
| These should be specified while constructing and instance of FieldNameBasedTupleToKafkaMapper.</p> |
| |
| <h3 id="kafkatopicselector-and-trident-kafkatopicselector">KafkaTopicSelector and trident KafkaTopicSelector</h3> |
| |
| <p>This interface has only one method |
| <code>java |
| public interface KafkaTopicSelector { |
| String getTopics(Tuple/TridentTuple tuple); |
| } |
| </code> |
| The implementation of this interface should return the topic to which the tuple's key/message mapping needs to be published |
| You can return a null and the message will be ignored. If you have one static topic name then you can use |
| DefaultTopicSelector.java and set the name of the topic in the constructor.</p> |
| |
| <h3 id="specifying-kafka-producer-properties">Specifying Kafka producer properties</h3> |
| |
| <p>You can provide all the produce properties , see <a href="http://kafka.apache.org/documentation.html#producerconfigs">http://kafka.apache.org/documentation.html#producerconfigs</a> |
| section "Important configuration properties for the producer", in your Storm topology config by setting the properties |
| map with key kafka.broker.properties.</p> |
| |
| <h3 id="putting-it-all-together">Putting it all together</h3> |
| |
| <p>For the bolt : |
| ```java |
| TopologyBuilder builder = new TopologyBuilder();</p> |
| <div class="highlight"><pre><code class="language-text" data-lang="text"> Fields fields = new Fields("key", "message"); |
| FixedBatchSpout spout = new FixedBatchSpout(fields, 4, |
| new Values("storm", "1"), |
| new Values("trident", "1"), |
| new Values("needs", "1"), |
| new Values("javadoc", "1") |
| ); |
| spout.setCycle(true); |
| builder.setSpout("spout", spout, 5); |
| KafkaBolt bolt = new KafkaBolt() |
| .withTopicSelector(new DefaultTopicSelector("test")) |
| .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper()); |
| builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout"); |
| |
| Config conf = new Config(); |
| //set producer properties. |
| Properties props = new Properties(); |
| props.put("metadata.broker.list", "localhost:9092"); |
| props.put("request.required.acks", "1"); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props); |
| |
| StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology()); |
| </code></pre></div><div class="highlight"><pre><code class="language-text" data-lang="text">For Trident: |
| |
| ```java |
| Fields fields = new Fields("word", "count"); |
| FixedBatchSpout spout = new FixedBatchSpout(fields, 4, |
| new Values("storm", "1"), |
| new Values("trident", "1"), |
| new Values("needs", "1"), |
| new Values("javadoc", "1") |
| ); |
| spout.setCycle(true); |
| |
| TridentTopology topology = new TridentTopology(); |
| Stream stream = topology.newStream("spout1", spout); |
| |
| TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory() |
| .withKafkaTopicSelector(new DefaultTopicSelector("test")) |
| .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count")); |
| stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields()); |
| |
| Config conf = new Config(); |
| //set producer properties. |
| Properties props = new Properties(); |
| props.put("metadata.broker.list", "localhost:9092"); |
| props.put("request.required.acks", "1"); |
| props.put("serializer.class", "kafka.serializer.StringEncoder"); |
| conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props); |
| StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build()); |
| </code></pre></div> |
| |
| |
| </div> |
| </div> |
| </div> |
| <footer> |
| <div class="container-fluid"> |
| <div class="row"> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Meetups</h5> |
| <ul class="latest-news"> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Apache-Kafka/">Apache Storm & Apache Kafka</a> <span class="small">(Sunnyvale, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Apache Storm & Kafka Users</a> <span class="small">(Seattle, WA)</span></li> |
| |
| <li><a href="http://www.meetup.com/New-York-City-Storm-User-Group/">NYC Storm User Group</a> <span class="small">(New York, NY)</span></li> |
| |
| <li><a href="http://www.meetup.com/Bay-Area-Stream-Processing">Bay Area Stream Processing</a> <span class="small">(Emeryville, CA)</span></li> |
| |
| <li><a href="http://www.meetup.com/Boston-Storm-Users/">Boston Realtime Data</a> <span class="small">(Boston, MA)</span></li> |
| |
| <li><a href="http://www.meetup.com/storm-london">London Storm User Group</a> <span class="small">(London, UK)</span></li> |
| |
| <!-- <li><a href="http://www.meetup.com/Apache-Storm-Kafka-Users/">Seatle, WA</a> <span class="small">(27 Jun 2015)</span></li> --> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>About Storm</h5> |
| <p>Storm integrates with any queueing system and any database system. Storm's spout abstraction makes it easy to integrate a new queuing system. Likewise, integrating Storm with database systems is easy.</p> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>First Look</h5> |
| <ul class="footer-list"> |
| <li><a href="/documentation/Rationale.html">Rationale</a></li> |
| <li><a href="/tutorial.html">Tutorial</a></li> |
| <li><a href="/documentation/Setting-up-development-environment.html">Setting up development environment</a></li> |
| <li><a href="/documentation/Creating-a-new-Storm-project.html">Creating a new Storm project</a></li> |
| </ul> |
| </div> |
| </div> |
| <div class="col-md-3"> |
| <div class="footer-widget"> |
| <h5>Documentation</h5> |
| <ul class="footer-list"> |
| <li><a href="/doc-index.html">Index</a></li> |
| <li><a href="/documentation.html">Manual</a></li> |
| <li><a href="https://storm.apache.org/javadoc/apidocs/index.html">Javadoc</a></li> |
| <li><a href="/documentation/FAQ.html">FAQ</a></li> |
| </ul> |
| </div> |
| </div> |
| </div> |
| <hr/> |
| <div class="row"> |
| <div class="col-md-12"> |
| <p align="center">Copyright © 2015 <a href="http://www.apache.org">Apache Software Foundation</a>. All Rights Reserved. |
| <br>Apache Storm, Apache, the Apache feather logo, and the Apache Storm project logos are trademarks of The Apache Software Foundation. |
| <br>All other marks mentioned may be trademarks or registered trademarks of their respective owners.</p> |
| </div> |
| </div> |
| </div> |
| </footer> |
| <!--Footer End--> |
| <!-- Scroll to top --> |
| <span class="totop"><a href="#"><i class="fa fa-angle-up"></i></a></span> |
| |
| </body> |
| |
| </html> |
| |