Merge branch 'STORM-1040' of https://github.com/haohui/storm into STORM-1040
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 2fe930e..f6f14ac 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -226,8 +226,9 @@
DefaultTopicSelector.java and set the name of the topic in the constructor.
### Specifying Kafka producer properties
-You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see http://kafka.apache.org/documentation.html#newproducerconfigs
-Section "Important configuration properties for the producer" for more details.
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs
+section "Important configuration properties for the producer", in your Storm topology config by setting the properties
+map with key kafka.broker.properties.
###Using wildcard kafka topic match
You can do a wildcard topic match by adding the following config
@@ -237,7 +238,7 @@
```
-After this you can specify a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc
+After this you can specifiy a wildcard topic for matching e.g. clickstream.*.log. This will match all streams matching clickstream.my.log, clickstream.cart.log etc
###Putting it all together
@@ -269,7 +270,13 @@
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
-
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("metadata.broker.list", "localhost:9092");
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
```
@@ -302,6 +309,12 @@
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
+ //set producer properties.
+ Properties props = new Properties();
+ props.put("metadata.broker.list", "localhost:9092");
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
+ conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
```