blob: bd8ecbaf11a152ce383e3d2ddb528f1d0bc1e42b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* Contains some contributions under the Thrift Software License.
* Please see doc/old-thrift-license.txt in the Thrift distribution for
* details.
*/
package storm.starter.trident;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.generated.StormTopology;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import org.apache.kafka.clients.producer.ProducerConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
import storm.kafka.bolt.KafkaBolt;
import storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import storm.kafka.bolt.selector.DefaultTopicSelector;
import storm.kafka.trident.TransactionalTridentKafkaSpout;
import storm.kafka.trident.TridentKafkaConfig;
import storm.starter.spout.RandomSentenceSpout;
import storm.trident.Stream;
import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.MapGet;
import storm.trident.testing.MemoryMapState;
import storm.trident.testing.Split;
import java.util.Properties;
/**
* A sample word count trident topology using transactional kafka spout that has the following components.
* <ol>
* <li> {@link KafkaBolt}
* that receives random sentences from {@link RandomSentenceSpout} and
* publishes the sentences to a kafka "test" topic.
* </li>
* <li> {@link TransactionalTridentKafkaSpout}
* that consumes sentences from the "test" topic, splits it into words, aggregates
* and stores the word count in a {@link MemoryMapState}.
* </li>
* <li> DRPC query
* that returns the word counts by querying the trident state (MemoryMapState).
* </li>
* </ol>
* <p>
* For more background read the <a href="https://storm.apache.org/documentation/Trident-tutorial.html">trident tutorial</a>,
* <a href="https://storm.apache.org/documentation/Trident-state">trident state</a> and
* <a href="https://github.com/apache/storm/tree/master/external/storm-kafka"> Storm Kafka </a>.
* </p>
*/
public class TridentKafkaWordCount {
private String zkUrl;
private String brokerUrl;
TridentKafkaWordCount(String zkUrl, String brokerUrl) {
this.zkUrl = zkUrl;
this.brokerUrl = brokerUrl;
}
/**
* Creates a transactional kafka spout that consumes any new data published to "test" topic.
* <p/>
* For more info on transactional spouts
* see "Transactional spouts" section in
* <a href="https://storm.apache.org/documentation/Trident-state"> Trident state</a> doc.
*
* @return a transactional trident kafka spout.
*/
private TransactionalTridentKafkaSpout createKafkaSpout() {
ZkHosts hosts = new ZkHosts(zkUrl);
TridentKafkaConfig config = new TridentKafkaConfig(hosts, "test");
config.scheme = new SchemeAsMultiScheme(new StringScheme());
// Consume new data from the topic
config.startOffsetTime = kafka.api.OffsetRequest.LatestTime();
return new TransactionalTridentKafkaSpout(config);
}
private Stream addDRPCStream(TridentTopology tridentTopology, TridentState state, LocalDRPC drpc) {
return tridentTopology.newDRPCStream("words", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))
.each(new Fields("count"), new FilterNull())
.project(new Fields("word", "count"));
}
private TridentState addTridentState(TridentTopology tridentTopology) {
return tridentTopology.newStream("spout1", createKafkaSpout()).parallelismHint(1)
.each(new Fields("str"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(1);
}
/**
* Creates a trident topology that consumes sentences from the kafka "test" topic using a
* {@link TransactionalTridentKafkaSpout} computes the word count and stores it in a {@link MemoryMapState}.
* A DRPC stream is then created to query the word counts.
* @param drpc
* @return
*/
public StormTopology buildConsumerTopology(LocalDRPC drpc) {
TridentTopology tridentTopology = new TridentTopology();
addDRPCStream(tridentTopology, addTridentState(tridentTopology), drpc);
return tridentTopology.build();
}
/**
* Return the consumer topology config.
*
* @return the topology config
*/
public Config getConsumerConfig() {
Config conf = new Config();
conf.setMaxSpoutPending(20);
// conf.setDebug(true);
return conf;
}
/**
* A topology that produces random sentences using {@link RandomSentenceSpout} and
* publishes the sentences using a KafkaBolt to kafka "test" topic.
*
* @return the storm topology
*/
public StormTopology buildProducerTopology(Properties prop) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 2);
/**
* The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
* so that this gets written out as the message in the kafka topic.
*/
KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
return builder.createTopology();
}
/**
* Returns the storm config for the topology that publishes sentences to kafka "test" topic using a kafka bolt.
* The KAFKA_BROKER_PROPERTIES is needed for the KafkaBolt.
*
* @return the topology config
*/
public Properties getProducerConfig() {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
return props;
}
/**
* <p>
* To run this topology ensure you have a kafka broker running.
* </p>
* Create a topic test with command line,
* kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic test
*/
public static void main(String[] args) throws Exception {
String zkUrl = "localhost:2181"; // the defaults.
String brokerUrl = "localhost:9092";
if (args.length > 2 || (args.length == 1 && args[0].matches("^-h|--help$"))) {
System.out.println("Usage: TridentKafkaWordCount [kafka zookeeper url] [kafka broker url]");
System.out.println(" E.g TridentKafkaWordCount [" + zkUrl + "]" + " [" + brokerUrl + "]");
System.exit(1);
} else if (args.length == 1) {
zkUrl = args[0];
} else if (args.length == 2) {
zkUrl = args[0];
brokerUrl = args[1];
}
System.out.println("Using Kafka zookeeper url: " + zkUrl + " broker url: " + brokerUrl);
TridentKafkaWordCount wordCount = new TridentKafkaWordCount(zkUrl, brokerUrl);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
// submit the consumer topology.
cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
Config conf = new Config();
conf.setMaxSpoutPending(20);
// submit the producer topology.
cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig()));
// keep querying the word counts for a minute.
for (int i = 0; i < 60; i++) {
System.out.println("DRPC RESULT: " + drpc.execute("words", "the and apple snow jumped"));
Thread.sleep(1000);
}
cluster.killTopology("kafkaBolt");
cluster.killTopology("wordCounter");
cluster.shutdown();
}
}