Merge branch 'STORM-1040' of https://github.com/haohui/storm into STORM-1040
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 2fe930e..f6f14ac 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -226,8 +226,9 @@
 DefaultTopicSelector.java and set the name of the topic in the constructor.
 
 ### Specifying Kafka producer properties
-You can provide all the produce properties in your Storm topology by calling `KafkaBolt.withProducerProperties()` and `TridentKafkaStateFactory.withProducerProperties()`. Please see  http://kafka.apache.org/documentation.html#newproducerconfigs
-Section "Important configuration properties for the producer" for more details.
+You can provide all the produce properties , see http://kafka.apache.org/documentation.html#producerconfigs 
+section "Important configuration properties for the producer", in your Storm topology config by setting the properties
+map with key kafka.broker.properties.
 
 ###Using wildcard kafka topic match
 You can do a wildcard topic match by adding the following config
@@ -237,7 +238,7 @@
 
 ```
 
-After this you can specify a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
+After this you can specifiy a wildcard topic for matching e.g. clickstream.*.log.  This will match all streams matching clickstream.my.log, clickstream.cart.log etc
 
 
 ###Putting it all together
@@ -269,7 +270,13 @@
         builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
 
         Config conf = new Config();
-
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(KafkaBolt.KAFKA_BROKER_PROPERTIES, props);
+        
         StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
 ```
 
@@ -302,6 +309,12 @@
         stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
 
         Config conf = new Config();
+        //set producer properties.
+        Properties props = new Properties();
+        props.put("metadata.broker.list", "localhost:9092");
+        props.put("request.required.acks", "1");
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        conf.put(TridentKafkaState.KAFKA_BROKER_PROPERTIES, props);
         StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
 ```