MLHR-1143 #resolve #comment make producer configurable
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 9525cff..8b73d2e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -15,12 +15,18 @@
*/
package com.datatorrent.contrib.kafka;
+import java.util.Properties;
+
import com.datatorrent.api.annotation.ShipContainingJars;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
+
import javax.validation.constraints.NotNull;
+
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,7 +53,7 @@
*
* @since 0.3.2
*/
-@ShipContainingJars(classes={kafka.javaapi.producer.Producer.class, org.I0Itec.zkclient.ZkClient.class, scala.Function.class})
+@ShipContainingJars(classes={kafka.javaapi.producer.Producer.class, org.I0Itec.zkclient.ZkClient.class, scala.Function.class, StringUtils.class})
public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
{
@SuppressWarnings("unused")
@@ -57,12 +63,40 @@
private String topic = "topic1";
protected int sendCount;
+
+ private String producerProperties = "";
+
+ private Properties configProperties = new Properties();
+
+ public Properties getConfigProperties()
+ {
+ return configProperties;
+ }
+
+ public void setConfigProperties(Properties configProperties)
+ {
+ this.configProperties = configProperties;
+ }
+
/**
- * Abstract method to setup producer configuration.
+ * setup producer configuration.
* @return ProducerConfig
*/
- public abstract ProducerConfig createKafkaProducerConfig();
+ protected ProducerConfig createKafkaProducerConfig(){
+ Properties prop = new Properties();
+ for (String propString : producerProperties.split(",")) {
+ if (!propString.contains("=")) {
+ continue;
+ }
+ String[] keyVal = StringUtils.trim(propString).split("=");
+ prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+ }
+
+ configProperties.putAll(prop);
+
+ return new ProducerConfig(configProperties);
+ };
public Producer<K, V> getProducer()
{
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
index 0a0b674..b7955ab 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaSinglePortOutputOperator.java
@@ -15,12 +15,9 @@
*/
package com.datatorrent.contrib.kafka;
-import java.util.Properties;
-
import com.datatorrent.api.annotation.InputPortFieldAnnotation;
import com.datatorrent.api.DefaultInputPort;
import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
/**
* Kafka output adapter operator with only one input port, which produce data into Kafka message bus.<p><br>
@@ -47,9 +44,7 @@
*/
public class KafkaSinglePortOutputOperator<K, V> extends AbstractKafkaOutputOperator<K, V>
{
- private Properties configProperties = null;
-
-
+
/**
* The single input port.
*/
@@ -70,19 +65,4 @@
}
};
- public Properties getConfigProperties()
- {
- return configProperties;
- }
-
- public void setConfigProperties(Properties configProperties)
- {
- this.configProperties = configProperties;
- }
-
- @Override
- public ProducerConfig createKafkaProducerConfig()
- {
- return new ProducerConfig(configProperties);
- }
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index c6e6d22..b78314a 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -122,7 +122,7 @@
public void testKafkaOutputOperator() throws Exception
{
//initialize the latch to synchronize the threads
- latch = new CountDownLatch(1);
+ latch = new CountDownLatch(maxTuple);
// Setup a message listener to receive the message
KafkaTestConsumer listener = new KafkaTestConsumer("topic1");
listener.setLatch(latch);