Merge branch 'STORM-1352' of https://github.com/haohui/storm into STORM-1352
diff --git a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
index 813841a..bd8ecba 100644
--- a/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
+++ b/examples/storm-starter/src/jvm/storm/starter/trident/TridentKafkaWordCount.java
@@ -149,14 +149,14 @@
      *
      * @return the storm topology
      */
-    public StormTopology buildProducerTopology() {
+    public StormTopology buildProducerTopology(Properties prop) {
         TopologyBuilder builder = new TopologyBuilder();
         builder.setSpout("spout", new RandomSentenceSpout(), 2);
         /**
          * The output field of the RandomSentenceSpout ("word") is provided as the boltMessageField
          * so that this gets written out as the message in the kafka topic.
          */
-        KafkaBolt bolt = new KafkaBolt()
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(prop)
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "word"));
         builder.setBolt("forwardToKafka", bolt, 1).shuffleGrouping("spout");
@@ -169,16 +169,13 @@
      *
      * @return the topology config
      */
-    public Config getProducerConfig() {
-        Config conf = new Config();
-        conf.setMaxSpoutPending(20);
+    public Properties getProducerConfig() {
         Properties props = new Properties();
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "storm-kafka-producer");
-        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        return conf;
+        return props;
     }
 
     /**
@@ -214,8 +211,10 @@
         // submit the consumer topology.
         cluster.submitTopology("wordCounter", wordCount.getConsumerConfig(), wordCount.buildConsumerTopology(drpc));
 
+        Config conf = new Config();
+        conf.setMaxSpoutPending(20);
         // submit the producer topology.
-        cluster.submitTopology("kafkaBolt", wordCount.getProducerConfig(), wordCount.buildProducerTopology());
+        cluster.submitTopology("kafkaBolt", conf, wordCount.buildProducerTopology(wordCount.getProducerConfig()));
 
         // keep querying the word counts for a minute.
         for (int i = 0; i < 60; i++) {
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 91c64bf..a63284a 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -218,10 +218,21 @@
 DefaultTopicSelector.java and set the name of the topic in the constructor.
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs 
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#newproducerconfigs
 section "Important configuration properties for the producer", in your Storm topology config by setting the properties
 map with key kafka.broker.properties.
 
+###Using wildcard kafka topic match
+You can do a wildcard topic match by adding the following config
+```
+     Config config = new Config();
+     config.put("kafka.topic.wildcard.match",true);
+
+```
+
+After this you can specifiy a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
+
+
 ###Putting it all together
 
 For the bolt :
@@ -241,15 +252,16 @@
                 .withTopicSelector(new DefaultTopicSelector("test"))
                 .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
         builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
-        
+
         Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        
+
         StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
@@ -268,7 +280,15 @@
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
         TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
+                .withProducerProperties(props)
                 .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                 .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
         stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
@@ -276,9 +296,10 @@
         Config conf = new Config();
         //set producer properties.
         Properties props = new Properties();
-        props.put("metadata.broker.list", "localhost:9092");
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("bootstrap.servers", "localhost:9092");
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
         StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
 ```
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 2cca826..1ebe142 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -57,7 +57,6 @@
     private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
 
     public static final String TOPIC = "topic";
-    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
 
     private KafkaProducer<K, V> producer;
     private OutputCollector collector;
@@ -73,8 +72,7 @@
     private boolean fireAndForget = false;
     private boolean async = true;
 
-    public KafkaBolt() {
-    }
+    public KafkaBolt() {}
 
     public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
         this.mapper = mapper;
@@ -103,14 +101,7 @@
             this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
         }
 
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
-        Properties properties = new Properties();
-        if(configMap!= null)
-            properties.putAll(configMap);
-        if(boltSpecfiedProperties != null)
-            properties.putAll(boltSpecfiedProperties);
-
-        producer = new KafkaProducer<K, V>(properties);
+        producer = new KafkaProducer<>(boltSpecfiedProperties);
         this.collector = collector;
     }
 
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
index 402ffb1..84b6a6a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java
@@ -19,10 +19,10 @@
 
 import backtype.storm.task.OutputCollector;
 import backtype.storm.topology.FailedException;
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
@@ -32,15 +32,14 @@
 import storm.trident.tuple.TridentTuple;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 public class TridentKafkaState implements State {
     private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
 
-    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
-
-    private Producer producer;
+    private KafkaProducer producer;
     private OutputCollector collector;
 
     private TridentTupleToKafkaMapper mapper;
@@ -66,14 +65,10 @@
         LOG.debug("commit is Noop.");
     }
 
-    public void prepare(Map stormConf) {
+    public void prepare(Properties options) {
         Validate.notNull(mapper, "mapper can not be null");
         Validate.notNull(topicSelector, "topicSelector can not be null");
-        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
-        Properties properties = new Properties();
-        properties.putAll(configMap);
-        ProducerConfig config = new ProducerConfig(properties);
-        producer = new Producer(config);
+        producer = new KafkaProducer(options);
     }
 
     public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
@@ -83,8 +78,16 @@
                 topic = topicSelector.getTopic(tuple);
 
                 if(topic != null) {
-                    producer.send(new KeyedMessage(topic, mapper.getKeyFromTuple(tuple),
-                            mapper.getMessageFromTuple(tuple)));
+                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+                    try {
+                        result.get();
+                    } catch (ExecutionException e) {
+                        String errorMsg = "Could not retrieve result for message with key = "
+                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+                        LOG.error(errorMsg, e);
+                        throw new FailedException(errorMsg, e);
+                    }
                 } else {
                     LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
                 }
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
index adca69e..a5d9d42 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -26,6 +26,7 @@
 import storm.trident.state.StateFactory;
 
 import java.util.Map;
+import java.util.Properties;
 
 public class TridentKafkaStateFactory implements StateFactory {
 
@@ -33,7 +34,7 @@
 
     private TridentTupleToKafkaMapper mapper;
     private KafkaTopicSelector topicSelector;
-
+    private Properties producerProperties = new Properties();
 
     public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
         this.mapper = mapper;
@@ -45,13 +46,18 @@
         return this;
     }
 
+    public TridentKafkaStateFactory withProducerProperties(Properties props) {
+        this.producerProperties = props;
+        return this;
+    }
+
     @Override
     public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
         LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
         TridentKafkaState state = new TridentKafkaState()
                 .withKafkaTopicSelector(this.topicSelector)
                 .withTridentTupleToKafkaMapper(this.mapper);
-        state.prepare(conf);
+        state.prepare(producerProperties);
         return state;
     }
 }
diff --git a/external/storm-kafka/src/test/storm/kafka/TestUtils.java b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
index fa29992..cde022e 100644
--- a/external/storm-kafka/src/test/storm/kafka/TestUtils.java
+++ b/external/storm-kafka/src/test/storm/kafka/TestUtils.java
@@ -40,9 +40,14 @@
         return buildPartitionInfo(numPartitions, 9092);
     }
 
+    public static List<GlobalPartitionInformation> buildPartitionInfoList(GlobalPartitionInformation partitionInformation) {
+        List<GlobalPartitionInformation> map = new ArrayList<GlobalPartitionInformation>();
+        map.add(partitionInformation);
+        return map;
+    }
 
     public static GlobalPartitionInformation buildPartitionInfo(int numPartitions, int brokerPort) {
-        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC);
         for (int i = 0; i < numPartitions; i++) {
             globalPartitionInformation.addPartition(i, Broker.fromString("broker-" + i + " :" + brokerPort));
         }
@@ -63,21 +68,18 @@
     }
 
     private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) {
-        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(TOPIC);
         globalPartitionInformation.addPartition(0, Broker.fromString(broker.getBrokerConnectionString()));
         return new StaticHosts(globalPartitionInformation);
     }
 
-    public static Config getConfig(String brokerConnectionString) {
-        Config config = new Config();
+    public static Properties getProducerProperties(String brokerConnectionString) {
         Properties props = new Properties();
-        props.put("metadata.broker.list", brokerConnectionString);
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
-        config.put(KafkaBolt.TOPIC, TOPIC);
-
-        return config;
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        return props;
     }
 
     public static boolean verifyMessage(String key, String message, KafkaTestBroker broker, SimpleConsumer simpleConsumer) {
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
index d8a5e24..8213b07 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTest.java
@@ -17,7 +17,6 @@
  */
 package storm.kafka;
 
-import backtype.storm.Config;
 import backtype.storm.tuple.Fields;
 import kafka.javaapi.consumer.SimpleConsumer;
 import org.junit.After;
@@ -37,22 +36,18 @@
 public class TridentKafkaTest {
     private KafkaTestBroker broker;
     private TridentKafkaState state;
-    private Config config;
     private SimpleConsumer simpleConsumer;
-    private TridentTupleToKafkaMapper mapper;
-    private KafkaTopicSelector topicSelector;
 
     @Before
     public void setup() {
         broker = new KafkaTestBroker();
         simpleConsumer = TestUtils.getKafkaConsumer(broker);
-        config = TestUtils.getConfig(broker.getBrokerConnectionString());
-        mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
-        topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
+        TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message");
+        KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC);
         state = new TridentKafkaState()
                 .withKafkaTopicSelector(topicSelector)
                 .withTridentTupleToKafkaMapper(mapper);
-        state.prepare(config);
+        state.prepare(TestUtils.getProducerProperties(broker.getBrokerConnectionString()));
     }
 
     @Test
@@ -71,7 +66,7 @@
     }
 
     private List<TridentTuple> generateTupleBatch(String key, String message, int batchsize) {
-        List<TridentTuple> batch = new ArrayList<TridentTuple>();
+        List<TridentTuple> batch = new ArrayList<>();
         for(int i =0 ; i < batchsize; i++) {
             batch.add(TridentTupleView.createFreshTuple(new Fields("key", "message"), key, message));
         }
diff --git a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
index 9cb7bbf..b9e25e4 100644
--- a/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
+++ b/external/storm-kafka/src/test/storm/kafka/TridentKafkaTopology.java
@@ -22,7 +22,7 @@
 import backtype.storm.generated.StormTopology;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Values;
-import storm.kafka.trident.TridentKafkaState;
+import com.google.common.collect.ImmutableMap;
 import storm.kafka.trident.TridentKafkaStateFactory;
 import storm.kafka.trident.TridentKafkaUpdater;
 import storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
@@ -31,14 +31,11 @@
 import storm.trident.TridentTopology;
 import storm.trident.testing.FixedBatchSpout;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.Properties;
 
-
 public class TridentKafkaTopology {
 
-    private static StormTopology buildTopology() {
+    private static StormTopology buildTopology(String brokerConnectionString) {
         Fields fields = new Fields("word", "count");
         FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                 new Values("storm", "1"),
@@ -51,9 +48,16 @@
         TridentTopology topology = new TridentTopology();
         Stream stream = topology.newStream("spout1", spout);
 
+        Properties props = new Properties();
+        props.put("bootstrap.servers", brokerConnectionString);
+        props.put("acks", "1");
+        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
         TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
-                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
-                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
+            .withProducerProperties(props)
+            .withKafkaTopicSelector(new DefaultTopicSelector("test"))
+            .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
         stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
 
         return topology.build();
@@ -77,24 +81,11 @@
             System.out.println("Please provide kafka broker url ,e.g. localhost:9092");
         }
 
-        Config conf = getConfig(args[0]);
         LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("wordCounter", conf, buildTopology());
+        cluster.submitTopology("wordCounter", new Config(), buildTopology(args[0]));
         Thread.sleep(60 * 1000);
         cluster.killTopology("wordCounter");
 
         cluster.shutdown();
     }
-
-    private  static Config getConfig(String brokerConnectionString) {
-        Config conf = new Config();
-        Map config = new HashMap();
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerConnectionString);
-        props.put("request.required.acks", "1");
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
-        return conf;
-    }
-
 }
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index 53d7c50..9cbdee9 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -183,7 +183,6 @@
     }
 
     private KafkaBolt generateStringSerializerBolt() {
-        KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
         props.put("request.required.acks", "1");
         props.put("serializer.class", "kafka.serializer.StringEncoder");
@@ -191,14 +190,13 @@
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(false);
         return bolt;
     }
 
     private KafkaBolt generateDefaultSerializerBolt(boolean async, boolean fireAndForget) {
-        KafkaBolt bolt = new KafkaBolt();
         Properties props = new Properties();
         props.put("request.required.acks", "1");
         props.put("serializer.class", "kafka.serializer.StringEncoder");
@@ -207,7 +205,7 @@
         props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
         props.put("metadata.fetch.timeout.ms", 1000);
         props.put("linger.ms", 0);
-        config.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        KafkaBolt bolt = new KafkaBolt().withProducerProperties(props);
         bolt.prepare(config, null, new OutputCollector(collector));
         bolt.setAsync(async);
         bolt.setFireAndForget(fireAndForget);