MLHR-1143 #resolve #comment make producer configurable
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 9525cff..8b73d2e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -15,12 +15,18 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import java.util.Properties;
+
 import com.datatorrent.api.annotation.ShipContainingJars;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.Operator;
+
 import javax.validation.constraints.NotNull;
+
 import kafka.javaapi.producer.Producer;
 import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,7 +53,7 @@
  *
  * @since 0.3.2
  */
-@ShipContainingJars(classes={kafka.javaapi.producer.Producer.class, org.I0Itec.zkclient.ZkClient.class, scala.Function.class})
+@ShipContainingJars(classes={kafka.javaapi.producer.Producer.class, org.I0Itec.zkclient.ZkClient.class, scala.Function.class, StringUtils.class})
 public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
 {
   @SuppressWarnings("unused")
@@ -57,12 +63,40 @@
   private String topic = "topic1";
 
   protected int sendCount;
+  
+  private String producerProperties = "";
+  
+  private Properties configProperties = new Properties();
+    
+  public Properties getConfigProperties()
+  {
+    return configProperties;
+  }
+
+  public void setConfigProperties(Properties configProperties)
+  {
+    this.configProperties = configProperties;
+  }
+
 
   /**
-   * Abstract method to setup producer configuration.
+   * setup producer configuration.
    * @return ProducerConfig
    */
-  public abstract ProducerConfig createKafkaProducerConfig();
+  protected ProducerConfig createKafkaProducerConfig(){
+    Properties prop = new Properties();
+    for (String propString : producerProperties.split(",")) {
+      if (!propString.contains("=")) {
+        continue;
+      }
+      String[] keyVal = StringUtils.trim(propString).split("=");
+      prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+    }
+    
+    configProperties.putAll(prop);
+    
+    return new ProducerConfig(configProperties);
+  };
 
   public Producer<K, V> getProducer()
   {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
index 0a0b674..b7955ab 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
@@ -15,12 +15,9 @@
  */
 package com.datatorrent.contrib.kafka;
 
-import java.util.Properties;
-
 import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.DefaultInputPort;
 import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
 
 /**
  * Kafka output adapter operator with only one input port, which produce data into Kafka message bus.<p><br>
@@ -47,9 +44,7 @@
  */
 public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator<K, V>
 {
-  private Properties configProperties = null;
-  
-  
+
   /**
    * The single input port.
    */
@@ -70,19 +65,4 @@
     }
   };
 
-  public Properties getConfigProperties()
-  {
-    return configProperties;
-  }
-
-  public void setConfigProperties(Properties configProperties)
-  {
-    this.configProperties = configProperties;
-  }
-
-  @Override
-  public ProducerConfig createKafkaProducerConfig()
-  {
-    return new ProducerConfig(configProperties);
-  }
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index c6e6d22..b78314a 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -122,7 +122,7 @@
   public void testKafkaOutputOperator() throws Exception
   {
     //initialize the latch to synchronize the threads
-    latch = new CountDownLatch(1);
+    latch = new CountDownLatch(maxTuple);
     // Setup a message listener to receive the message
     KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
     listener.setLatch(latch);