package org.apache.storm.starter;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.storm.Config;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.starter.bolt.WordCountBolt;
import org.apache.storm.starter.spout.RandomSentenceSpout;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.ConfigurableTopology;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
* Some topologies might spawn some threads within bolts to do some work and emit tuples from those threads.
* This is a simple wordcount topology example that mimics those use cases and might help us catch possible race conditions.
public class MultiThreadWordCountTopology extends ConfigurableTopology {
public static void main(String[] args) {
ConfigurableTopology.start(new MultiThreadWordCountTopology(), args);
protected int run(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("split", new MultiThreadedSplitSentence(), 1).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 1).fieldsGrouping("split", new Fields("word"));
//this makes sure there is only one executor per worker, easier to debug
//problems involving serialization/deserialization will only happen in inter-worker data transfer
//this involves metricsTick
String topologyName = "multithreaded-word-count";
if (args != null && args.length > 0) {
topologyName = args[0];
return submit(topologyName, conf, builder);
public static class MultiThreadedSplitSentence implements IRichBolt {
private OutputCollector collector;
private ExecutorService executor;
public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
executor = Executors.newFixedThreadPool(6);
//This makes sure metricsTick to be called every 1 second
//it makes the race condition between metricsTick and outputCollector easier to happen if any
context.registerMetric("dummy-counter", () -> 0, 1);
public void execute(Tuple input) {
String str = input.getString(0);
String[] splits = str.split("\\s+");
for (String s : splits) {
//spawn other threads to do the work and emit
Runnable runnableTask = () -> {
collector.emit(new Values(s));
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
public Map<String, Object> getComponentConfiguration() {
return null;
public void cleanup() {