Merge branch 'dt-dev' into kafkainput
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
similarity index 79%
rename from benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java
rename to benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
index 0426b54..585966a 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaInputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkKafkaInputOperator.java
@@ -18,7 +18,7 @@
 import kafka.message.Message;
 
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.contrib.kafka.AbstractPartitionableKafkaInputOperator;
+import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator;
 
 /**
  * This operator emits one constant message for each kafka message received. 
@@ -30,7 +30,7 @@
  *
  * @since 0.9.3
  */
-public class BenchmarkPartitionableKafkaInputOperator extends AbstractPartitionableKafkaInputOperator
+public class BenchmarkKafkaInputOperator extends AbstractKafkaInputOperator
 {
   /**
    * The output port on which messages are emitted.
@@ -38,12 +38,6 @@
   public transient DefaultOutputPort<String>  oport = new DefaultOutputPort<String>();
 
   @Override
-  protected AbstractPartitionableKafkaInputOperator cloneOperator()
-  {
-    return new BenchmarkPartitionableKafkaInputOperator();
-  }
-
-  @Override
   protected void emitTuple(Message message)
   {
     oport.emit("Received");
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
index 3f82afd..3640e15 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/BenchmarkPartitionableKafkaOutputOperator.java
@@ -29,6 +29,9 @@
 
 import javax.validation.constraints.Min;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultPartition;
 import com.datatorrent.api.InputOperator;
@@ -48,6 +51,8 @@
 public class BenchmarkPartitionableKafkaOutputOperator implements Partitioner<BenchmarkPartitionableKafkaOutputOperator>, InputOperator, ActivationListener<OperatorContext>
 {
 
+  private static final Logger logger = LoggerFactory.getLogger(BenchmarkPartitionableKafkaOutputOperator.class);
+  
   private String topic = "benchmark";
 
   @Min(1)
@@ -77,6 +82,7 @@
     @Override
     public void run()
     {
+      logger.info("Start produce data .... ");
       Properties props = new Properties();
       props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
       props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
@@ -85,7 +91,7 @@
       props.setProperty("partitioner.class", KafkaTestPartitioner.class.getCanonicalName());
       props.setProperty("producer.type", "async");
 //      props.setProperty("send.buffer.bytes", "1048576");
-      props.setProperty("topic.metadata.refresh.interval.ms", "100000");
+      props.setProperty("topic.metadata.refresh.interval.ms", "10000");
 
       if (producer == null) {
         producer = new Producer<String, String>(new ProducerConfig(props));
@@ -164,6 +170,7 @@
   public void activate(OperatorContext arg0)
   {
 
+    logger.info("Activate the benchmark kafka output operator .... ");
     constantMsg = new byte[msgSize];
     for (int i = 0; i < constantMsg.length; i++) {
       constantMsg[i] = (byte) ('a' + i%26);
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
index 18ef3f4..a6deefc 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/kafka/KafkaInputBenchmark.java
@@ -16,7 +16,7 @@
 
 package com.datatorrent.benchmark.kafka;
 
-import java.util.HashSet;
+import com.google.common.collect.Sets;
 import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
@@ -32,7 +32,7 @@
 import com.datatorrent.contrib.kafka.HighlevelKafkaConsumer;
 import com.datatorrent.contrib.kafka.KafkaConsumer;
 import com.datatorrent.contrib.kafka.SimpleKafkaConsumer;
-import com.google.common.collect.Sets;
+
 
 /**
  * The stream app to test the benckmark of kafka
@@ -54,16 +54,14 @@
       {
       }
     };
-
   }
 
-
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
 
     dag.setAttribute(DAG.APPLICATION_NAME, "KafkaInputOperatorPartitionDemo");
-    BenchmarkPartitionableKafkaInputOperator bpkio = new BenchmarkPartitionableKafkaInputOperator();
+    BenchmarkKafkaInputOperator bpkio = new BenchmarkKafkaInputOperator();
 
 
     String type = conf.get("kafka.consumertype", "simple");
@@ -75,19 +73,19 @@
       // Create template high-level consumer
 
       Properties props = new Properties();
-      props.put("zookeeper.connect", conf.get("kafka.zookeeper"));
       props.put("group.id", "main_group");
       props.put("auto.offset.reset", "smallest");
       consumer = new HighlevelKafkaConsumer(props);
     } else {
       // topic is set via property file
-      consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", new HashSet<Integer>());
+      consumer = new SimpleKafkaConsumer(null, 10000, 100000, "test_kafka_autop_client", null);
     }
 
-    consumer.setBrokerSet(Sets.newHashSet(conf.get("dt.kafka.brokerlist").split("\\s*,\\s*")));
+    bpkio.setZookeeper(conf.get("dt.kafka.zookeeper"));
     bpkio.setInitialPartitionCount(1);
     //bpkio.setTuplesBlast(1024 * 1024);
     bpkio.setConsumer(consumer);
+
     bpkio = dag.addOperator("KafkaBenchmarkConsumer", bpkio);
 
     CollectorModule cm = dag.addOperator("DataBlackhole", CollectorModule.class);
diff --git a/benchmark/src/site/conf/dt-site-kafka.xml b/benchmark/src/site/conf/dt-site-kafka.xml
index 4db589f..38ef213 100644
--- a/benchmark/src/site/conf/dt-site-kafka.xml
+++ b/benchmark/src/site/conf/dt-site-kafka.xml
@@ -16,5 +16,10 @@
     <name>dt.kafka.brokerlist</name>
     <value>localhost:9092</value>
   </property>
+
+    <property>
+        <name>dt.kafka.zookeeper</name>
+        <value>localhost:2181</value>
+    </property>
 </configuration>
 
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index 1dd682b..7e4211a 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -15,27 +15,97 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.Operator.ActivationListener;
+import com.datatorrent.api.Operator.CheckpointListener;
+import com.datatorrent.api.Partitioner;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import com.datatorrent.api.annotation.Stateless;
+
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.getOffsetsForPartitions;
+import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.get_1minMovingAvgParMap;
+
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Joiner;
+import com.google.common.base.Predicate;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Array;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import javax.validation.Valid;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.message.Message;
+import kafka.message.MessageAndOffset;
 
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Operator.ActivationListener;
-import com.datatorrent.api.Operator.CheckpointListener;
-
 /**
  * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.&nbsp;
  * Subclasses should implement the method for emitting tuples to downstream operators.
+ * It will be dynamically partitioned based on the upstream kafka partition.
  * <p>
+ * <b>Partition Strategy:</b>
+ * <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p>
+ * <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p>
+ * <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p>
+ * <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because
+ * <p>  1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p>
+ * <p>  2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p>
+ * <p></p>
+ * <br>
+ * <br>
+ * <b>Basic Algorithm:</b>
+ * <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p>
+ * <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p>
+ * <p>3.cloneOperator method is used to initialize the new {@link AbstractKafkaInputOperator} instance for the new partition operator</p>
+ * <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm(http://en.wikipedia.org/wiki/Bin_packing_problem) to minimize the partition operator
+ * <br>
+ * <br>
+ * <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br>
+ * <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer}
+ * <br>
+ * <br>
+ * <b>Self adjust to Kafka partition change:</b>
+ * <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p>
+ * <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p>
+ * <br>
+ * <br>
+ * </p>
  * Properties:<br>
  * <b>tuplesBlast</b>: Number of tuples emitted in each burst<br>
  * <b>bufferSize</b>: Size of holding buffer<br>
@@ -50,15 +120,7 @@
  * TBD<br>
  * <br>
  *
- * Shipped jars with this operator:<br>
- * <b>kafka.javaapi.consumer.SimpleConsumer.class</b> Official kafka consumer client <br>
- * <b>org.I0Itec.zkclient.ZkClient.class</b>  Kafka client depends on this <br>
- * <b>scala.ScalaObject.class</b>  Kafka client depends on this <br>
- * <b>com.yammer.matrics.Metrics.class</b>   Kafka client depends on this <br> <br>
- *
- * Each operator can only consume 1 topic<br>
- * If you want partitionable operator refer to {@link AbstractPartitionableKafkaInputOperator}
- * <br>
+ * Each operator can consume 1 topic from multiple partitions and clusters<br>
  * </p>
  *
  * @displayName Abstract Kafka Input
@@ -67,23 +129,69 @@
  *
  * @since 0.3.2
  */
-//SimpleConsumer is kafka consumer client used by this operator, zkclient is used by high-level kafka consumer
-public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener
+
+@OperatorAnnotation(partitionable = true)
+public abstract class AbstractKafkaInputOperator<K extends KafkaConsumer> implements InputOperator, ActivationListener<OperatorContext>, CheckpointListener, Partitioner<AbstractKafkaInputOperator<K>>, StatsListener
 {
   private static final Logger logger = LoggerFactory.getLogger(AbstractKafkaInputOperator.class);
 
   @Min(1)
   private int maxTuplesPerWindow = Integer.MAX_VALUE;
   private transient int emitCount = 0;
+  protected IdempotentStorageManager idempotentStorageManager;
+  protected transient long currentWindowId;
+  protected transient int operatorId;
+  protected final transient Map<KafkaPartition, MutablePair<Long, Integer>> currentWindowRecoveryState;
+  protected transient Map<KafkaPartition, Long> offsetStats = new HashMap<KafkaPartition, Long>();
+  private transient OperatorContext context = null;
+  private boolean idempotent = true;
+  // By default the partition policy is 1:1
+  public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
+
+  // default resource is unlimited in terms of msgs per second
+  private long msgRateUpperBound = Long.MAX_VALUE;
+
+  // default resource is unlimited in terms of bytes per second
+  private long byteRateUpperBound = Long.MAX_VALUE;
+
+  // Store the current operator partition topology
+  private transient List<PartitionInfo> currentPartitionInfo = Lists.newLinkedList();
+
+  // Store the current collected kafka consumer stats
+  private transient Map<Integer, List<KafkaConsumer.KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
+
+  private OffsetManager offsetManager = null;
+
+  // Minimal interval between 2 (re)partition actions
+  private long repartitionInterval = 30000L;
+
+  // Minimal interval between checking collected stats and decide whether it needs to repartition or not.
+  // And minimal interval between 2 offset updates
+  private long repartitionCheckInterval = 5000L;
+
+  private transient long lastCheckTime = 0L;
+
+  private transient long lastRepartitionTime = 0L;
+
+  // A list store the newly discovered partitions
+  private transient List<KafkaPartition> newWaitingPartition = new LinkedList<KafkaPartition>();
+
+  @Min(1)
+  private int initialPartitionCount = 1;
+
   @NotNull
   @Valid
   protected KafkaConsumer consumer = new SimpleKafkaConsumer();
 
+  public AbstractKafkaInputOperator()
+  {
+    idempotentStorageManager = new IdempotentStorageManager.FSIdempotentStorageManager();
+    currentWindowRecoveryState = new HashMap<KafkaPartition, MutablePair<Long, Integer>>();
+  }
+
   /**
-   * Any concrete class derived from KafkaInputOperator has to implement this method
-   * so that it knows what type of message it is going to send to Malhar in which output port.
+   * Any concrete class derived from KafkaInputOperator has to implement this method to emit tuples to an output port.
    *
-   * @param message
    */
   protected abstract void emitTuple(Message message);
 
@@ -97,88 +205,179 @@
     this.maxTuplesPerWindow = maxTuplesPerWindow;
   }
 
-  /**
-   * Implement Component Interface.
-   *
-   * @param context
-   */
   @Override
   public void setup(OperatorContext context)
   {
+    if(!(getConsumer() instanceof SimpleKafkaConsumer) || !idempotent) {
+      idempotentStorageManager = new IdempotentStorageManager.NoopIdempotentStorageManager();
+    }
     logger.debug("consumer {} topic {} cacheSize {}", consumer, consumer.getTopic(), consumer.getCacheSize());
     consumer.create();
+    this.context = context;
+    operatorId = context.getId();
+    idempotentStorageManager.setup(context);
   }
 
-  /**
-   * Implement Component Interface.
-   */
   @Override
   public void teardown()
   {
+    idempotentStorageManager.teardown();
     consumer.teardown();
   }
 
-  /**
-   * Implement Operator Interface.
-   */
   @Override
   public void beginWindow(long windowId)
   {
+    currentWindowId = windowId;
+    if (windowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+      replay(windowId);
+    }
     emitCount = 0;
   }
 
-  /**
-   * Implement Operator Interface.
-   */
+  protected void replay(long windowId)
+  {
+    try {
+      @SuppressWarnings("unchecked")
+      Map<KafkaPartition, MutablePair<Long, Integer>> recoveredData = (Map<KafkaPartition, MutablePair<Long, Integer>>) idempotentStorageManager.load(operatorId, windowId);
+      if (recoveredData == null) {
+        return;
+      }
+
+      Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().topic);
+      if (pms == null) {
+       return;
+      }
+
+      SimpleKafkaConsumer cons = (SimpleKafkaConsumer)getConsumer();
+      // add all partition request in one Fretch request together
+      FetchRequestBuilder frb = new FetchRequestBuilder().clientId(cons.getClientId());
+      for (Map.Entry<KafkaPartition, MutablePair<Long, Integer>> rc: recoveredData.entrySet()) {
+        KafkaPartition kp = rc.getKey();
+        List<PartitionMetadata> pmsVal = pms.get(kp.getClusterId());
+
+        Iterator<PartitionMetadata> pmIterator = pmsVal.iterator();
+        PartitionMetadata pm = pmIterator.next();
+        while (pm.partitionId() != kp.getPartitionId()) {
+          if (!pmIterator.hasNext())
+            break;
+          pm = pmIterator.next();
+        }
+        if (pm.partitionId() != kp.getPartitionId())
+          continue;
+
+        Broker bk = pm.leader();
+
+        frb.addFetch(consumer.topic, rc.getKey().getPartitionId(), rc.getValue().left, cons.getBufferSize());
+        FetchRequest req = frb.build();
+
+        SimpleConsumer ksc = new SimpleConsumer(bk.host(), bk.port(), cons.getTimeout(), cons.getBufferSize(), cons.getClientId());
+        FetchResponse fetchResponse = ksc.fetch(req);
+        Integer count = 0;
+        for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kp.getPartitionId())) {
+          emitTuple(msg.message());
+          offsetStats.put(kp, msg.offset());
+          count = count + 1;
+          if (count.equals(rc.getValue().right))
+            break;
+        }
+      }
+      if(windowId == idempotentStorageManager.getLargestRecoveryWindow()) {
+        // Set the offset positions to the consumer
+        Map<KafkaPartition, Long> currentOffsets = new HashMap<KafkaPartition, Long>(cons.getCurrentOffsets());
+        // Increment the offsets
+        for (Map.Entry<KafkaPartition, Long> e: offsetStats.entrySet()) {
+          currentOffsets.put(e.getKey(), e.getValue() + 1);
+        }
+
+        cons.resetOffset(currentOffsets);
+        cons.start();
+      }
+    }
+
+    catch (IOException e) {
+      throw new RuntimeException("replay", e);
+    }
+  }
+
   @Override
   public void endWindow()
   {
+    if (currentWindowId > idempotentStorageManager.getLargestRecoveryWindow()) {
+      try {
+        if((getConsumer() instanceof  SimpleKafkaConsumer)) {
+          SimpleKafkaConsumer cons = (SimpleKafkaConsumer) getConsumer();
+          context.setCounters(cons.getConsumerStats(offsetStats));
+        }
+        idempotentStorageManager.save(currentWindowRecoveryState, operatorId, currentWindowId);
+      }
+      catch (IOException e) {
+        throw new RuntimeException("saving recovery", e);
+      }
+    }
+    currentWindowRecoveryState.clear();
   }
 
   @Override
   public void checkpointed(long windowId)
   {
-    // commit the kafka consumer offset
+    // commit the consumer offset
     getConsumer().commitOffset();
   }
 
   @Override
   public void committed(long windowId)
   {
+    try {
+      idempotentStorageManager.deleteUpTo(operatorId, windowId);
+    }
+    catch (IOException e) {
+      throw new RuntimeException("deleting state", e);
+    }
   }
 
-  /**
-   * Implement ActivationListener Interface.
-   */
   @Override
   public void activate(OperatorContext ctx)
   {
+    if (context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) != Stateless.WINDOW_ID && context.getValue(OperatorContext.ACTIVATION_WINDOW_ID) < idempotentStorageManager.getLargestRecoveryWindow()) {
+      // If it is a replay state, don't start the consumer
+      return;
+    }
     // Don't start thread here!
-    // Because how many threads we want to start for kafka consumer depends on the type of kafka client and the message metadata(topic/partition/replica)
+    // # of kafka_consumer_threads depends on the type of kafka client and the message
+    // metadata(topic/partition/replica) layout
     consumer.start();
   }
 
-  /**
-   * Implement ActivationListener Interface.
-   */
   @Override
   public void deactivate()
   {
     consumer.stop();
   }
 
-  /**
-   * Implement InputOperator Interface.
-   */
   @Override
   public void emitTuples()
   {
+    if (currentWindowId <= idempotentStorageManager.getLargestRecoveryWindow()) {
+      return;
+    }
     int count = consumer.messageSize();
     if (maxTuplesPerWindow > 0) {
       count = Math.min(count, maxTuplesPerWindow - emitCount);
     }
     for (int i = 0; i < count; i++) {
-      emitTuple(consumer.pollMessage());
+      KafkaConsumer.KafkaMessage message = consumer.pollMessage();
+      // Ignore the duplicate messages
+      if(offsetStats.containsKey(message.kafkaPart) && message.offSet <= offsetStats.get(message.kafkaPart))
+        continue;
+      emitTuple(message.msg);
+      offsetStats.put(message.kafkaPart, message.offSet);
+      MutablePair<Long, Integer> offsetAndCount = currentWindowRecoveryState.get(message.kafkaPart);
+      if(offsetAndCount == null) {
+        currentWindowRecoveryState.put(message.kafkaPart, new MutablePair<Long, Integer>(message.offSet, 1));
+      } else {
+        offsetAndCount.setRight(offsetAndCount.right+1);
+      }
     }
     emitCount += count;
   }
@@ -193,20 +392,561 @@
     return consumer;
   }
 
-  //add topic as operator property
+  // add topic as operator property
   public void setTopic(String topic)
   {
     this.consumer.setTopic(topic);
   }
 
-  //add brokerlist as operator property
-  public void setBrokerSet(String brokerString)
+  /**
+   * Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
+   * The operator will discover the brokers that it needs to consume messages from.
+   */
+  public void setZookeeper(String zookeeperString)
   {
-    Set<String> brokerSet = new HashSet<String>();
-    for (String broker : brokerString.split(",")) {
-      brokerSet.add(broker);
+    SetMultimap<String, String> theClusters = HashMultimap.create();
+    for (String zk : zookeeperString.split(";")) {
+      String[] parts = zk.split("::");
+      String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
+      String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
+      String portId = "";
+      for (int idx = hostNames.length - 1; idx >= 0; idx--) {
+        String[] zkParts = hostNames[idx].split(":");
+        if (zkParts.length == 2) {
+          portId = zkParts[1];
+        }
+        if (!portId.isEmpty() && portId != "") {
+          theClusters.put(clusterId, zkParts[0] + ":" + portId);
+        } else {
+          throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeperString + "\n"
+              + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
+        }
+      }
     }
-    this.consumer.setBrokerSet(brokerSet);
+    this.consumer.setZookeeper(theClusters);
   }
 
+  @Override
+  public void partitioned(Map<Integer, Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions)
+  {
+    // update the last repartition time
+    lastRepartitionTime = System.currentTimeMillis();
+  }
+
+  @Override
+  public Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> definePartitions(Collection<Partitioner.Partition<AbstractKafkaInputOperator<K>>> partitions, Partitioner.PartitioningContext context)
+  {
+    // Initialize brokers from zookeepers
+    getConsumer().initBrokers();
+
+    // check if it's the initial partition
+    boolean isInitialParitition = partitions.iterator().next().getStats() == null;
+
+    // get partition metadata for topics.
+    // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
+    // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
+    Map<String, List<PartitionMetadata>> kafkaPartitions = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().brokers, getConsumer().getTopic());
+
+    // Operator partitions
+    List<Partitioner.Partition<AbstractKafkaInputOperator<K>>> newPartitions = null;
+
+    // initialize the offset
+    Map<KafkaPartition, Long> initOffset = null;
+    if(isInitialParitition && offsetManager !=null){
+      initOffset = offsetManager.loadInitialOffsets();
+      logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
+    }
+
+    Collection<IdempotentStorageManager> newManagers = Sets.newHashSet();
+    Set<Integer> deletedOperators =  Sets.newHashSet();
+
+    switch (strategy) {
+
+    // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions
+    // Each operator partition will consume from only one kafka partition
+    case ONE_TO_ONE:
+
+      if (isInitialParitition) {
+        lastRepartitionTime = System.currentTimeMillis();
+        logger.info("[ONE_TO_ONE]: Initializing partition(s)");
+
+        // initialize the number of operator partitions according to number of kafka partitions
+
+        newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
+        for (Map.Entry<String, List<PartitionMetadata>> kp : kafkaPartitions.entrySet()) {
+          String clusterId = kp.getKey();
+          for (PartitionMetadata pm : kp.getValue()) {
+            logger.info("[ONE_TO_ONE]: Create operator partition for cluster {}, topic {}, kafka partition {} ", clusterId, getConsumer().topic, pm.partitionId());
+            newPartitions.add(createPartition(Sets.newHashSet(new KafkaPartition(clusterId, consumer.topic, pm.partitionId())), initOffset, newManagers));
+          }
+        }
+
+      }
+      else if (newWaitingPartition.size() != 0) {
+        // add partition for new kafka partition
+        for (KafkaPartition newPartition : newWaitingPartition) {
+          logger.info("[ONE_TO_ONE]: Add operator partition for cluster {}, topic {}, partition {}", newPartition.getClusterId(), getConsumer().topic, newPartition.getPartitionId());
+          partitions.add(createPartition(Sets.newHashSet(newPartition), null, newManagers));
+        }
+        newWaitingPartition.clear();
+        idempotentStorageManager.partitioned(newManagers, deletedOperators);
+        return partitions;
+
+      }
+      break;
+    // For the 1 to N mapping The initial partition number is defined by stream application
+    // Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can
+    //  and guarantee the total intake rate for each operator partition is below some threshold
+    case ONE_TO_MANY:
+
+      if (getConsumer() instanceof HighlevelKafkaConsumer) {
+        throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
+      }
+
+      if (isInitialParitition) {
+        lastRepartitionTime = System.currentTimeMillis();
+        logger.info("[ONE_TO_MANY]: Initializing partition(s)");
+        int size = initialPartitionCount;
+        //Set<KafkaPartition>[] kps = new Set[size];
+        @SuppressWarnings("unchecked")
+        Set<KafkaPartition>[] kps = (Set<KafkaPartition>[]) Array.newInstance((new HashSet<KafkaPartition>()).getClass(), size);
+        newPartitions = new ArrayList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>(size);
+        int i = 0;
+        for (Map.Entry<String, List<PartitionMetadata>> en : kafkaPartitions.entrySet()) {
+          String clusterId = en.getKey();
+          for (PartitionMetadata pm : en.getValue()) {
+            if (kps[i % size] == null) {
+              kps[i % size] = new HashSet<KafkaPartition>();
+            }
+            kps[i % size].add(new KafkaPartition(clusterId, consumer.topic, pm.partitionId()));
+            i++;
+          }
+        }
+        for (i = 0; i < kps.length; i++) {
+          logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): {} ", StringUtils.join(kps[i], ", "));
+          newPartitions.add(createPartition(kps[i], initOffset, newManagers));
+        }
+
+      }
+      else if (newWaitingPartition.size() != 0) {
+
+        logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): {} ", StringUtils.join(newWaitingPartition, ", "));
+        partitions.add(createPartition(Sets.newHashSet(newWaitingPartition), null, newManagers));
+        idempotentStorageManager.partitioned(newManagers, deletedOperators);
+        return partitions;
+      }
+      else {
+
+        logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit");
+        // size of the list depends on the load and capacity of each operator
+        newPartitions = new LinkedList<Partitioner.Partition<AbstractKafkaInputOperator<K>>>();
+
+        // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition
+        // try to balance the load and minimize the number of containers with each container's load under the threshold
+        // the partition based on the latest 1 minute moving average
+        Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
+        // get the offset for all partitions of each consumer
+        Map<KafkaPartition, Long> offsetTrack = new HashMap<KafkaPartition, Long>();
+        for (Partitioner.Partition<AbstractKafkaInputOperator<K>> partition : partitions) {
+          List<Stats.OperatorStats> opss = partition.getStats().getLastWindowedStats();
+          if (opss == null || opss.size() == 0) {
+            continue;
+          }
+          offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets());
+          // Get the latest stats
+
+          Stats.OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1);
+          if (stat.counters instanceof KafkaConsumer.KafkaMeterStats) {
+            KafkaConsumer.KafkaMeterStats kms = (KafkaConsumer.KafkaMeterStats) stat.counters;
+            kPIntakeRate.putAll(get_1minMovingAvgParMap(kms));
+          }
+        }
+
+        List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate);
+
+        // Add the existing partition Ids to the deleted operators
+        for(Partitioner.Partition<AbstractKafkaInputOperator<K>> op : partitions)
+        {
+          deletedOperators.add(op.getPartitionedInstance().operatorId);
+        }
+        for (PartitionInfo r : partitionInfos) {
+          logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic);
+          newPartitions.add(createPartition(r.kpids, offsetTrack, newManagers));
+        }
+        currentPartitionInfo.addAll(partitionInfos);
+      }
+      break;
+
+    case ONE_TO_MANY_HEURISTIC:
+      throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
+    default:
+      break;
+    }
+
+    idempotentStorageManager.partitioned(newManagers, deletedOperators);
+    return newPartitions;
+  }
+
+  // Create a new partition with the partition Ids and initial offset positions
+  protected Partitioner.Partition<AbstractKafkaInputOperator<K>> createPartition(Set<KafkaPartition> pIds, Map<KafkaPartition, Long> initOffsets, Collection<IdempotentStorageManager> newManagers)
+  {
+    Kryo kryo = new Kryo();
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    Output output = new Output(bos);
+    kryo.writeObject(output, this);
+    output.close();
+    Input lInput = new Input(bos.toByteArray());
+    @SuppressWarnings("unchecked")
+    Partitioner.Partition<AbstractKafkaInputOperator<K>> p = new DefaultPartition<AbstractKafkaInputOperator<K>>(kryo.readObject(lInput, this.getClass()));
+    p.getPartitionedInstance().getConsumer().resetPartitionsAndOffset(pIds, initOffsets);
+    newManagers.add(p.getPartitionedInstance().idempotentStorageManager);
+
+    PartitionInfo pif = new PartitionInfo();
+    pif.kpids = pIds;
+    currentPartitionInfo.add(pif);
+    return p;
+  }
+
+  private List<PartitionInfo> firstFitDecreasingAlgo(final Map<KafkaPartition, long[]> kPIntakeRate)
+  {
+    // (Decreasing) Sort the map by msgs/s and bytes/s in descending order
+    List<Map.Entry<KafkaPartition, long[]>> sortedMapEntry = new LinkedList<Map.Entry<KafkaPartition, long[]>>(kPIntakeRate.entrySet());
+    Collections.sort(sortedMapEntry, new Comparator<Map.Entry<KafkaPartition, long[]>>()
+    {
+      @Override
+      public int compare(Map.Entry<KafkaPartition, long[]> firstEntry, Map.Entry<KafkaPartition, long[]> secondEntry)
+      {
+        long[] firstPair = firstEntry.getValue();
+        long[] secondPair = secondEntry.getValue();
+        if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) {
+          return (int) (secondPair[1] - firstPair[1]);
+        } else {
+          return (int) (secondPair[0] - firstPair[0]);
+        }
+      }
+    });
+
+    // (First-fit) Look for first fit operator to assign the consumer
+    // Go over all the kafka partitions and look for the right operator to assign to
+    // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions
+    List<PartitionInfo> pif = new LinkedList<PartitionInfo>();
+    outer:
+    for (Map.Entry<KafkaPartition, long[]> entry : sortedMapEntry) {
+      long[] resourceRequired = entry.getValue();
+      for (PartitionInfo r : pif) {
+        if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) {
+          // found first fit operator partition that has enough resource for this consumer
+          // add consumer to the operator partition
+          r.kpids.add(entry.getKey());
+          // update the resource left in this partition
+          r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0];
+          r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1];
+          continue outer;
+        }
+      }
+      // didn't find the existing "operator" to assign this consumer
+      PartitionInfo nr = new PartitionInfo();
+      nr.kpids = Sets.newHashSet(entry.getKey());
+      nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0];
+      nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1];
+      pif.add(nr);
+    }
+
+    return pif;
+  }
+
+  @Override
+  public StatsListener.Response processStats(StatsListener.BatchedOperatorStats stats)
+  {
+    StatsListener.Response resp = new StatsListener.Response();
+    List<KafkaConsumer.KafkaMeterStats> kstats = extractKafkaStats(stats);
+    resp.repartitionRequired = isPartitionRequired(stats.getOperatorId(), kstats);
+    return resp;
+  }
+
+  private void updateOffsets(List<KafkaConsumer.KafkaMeterStats> kstats)
+  {
+    //In every partition check interval, call offsetmanager to update the offsets
+    if (offsetManager != null) {
+      offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
+    }
+  }
+
+  private List<KafkaConsumer.KafkaMeterStats> extractKafkaStats(StatsListener.BatchedOperatorStats stats)
+  {
+    //preprocess the stats
+    List<KafkaConsumer.KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
+    for (Stats.OperatorStats os : stats.getLastWindowedStats()) {
+      if (os != null && os.counters instanceof KafkaConsumer.KafkaMeterStats) {
+        kmsList.add((KafkaConsumer.KafkaMeterStats) os.counters);
+      }
+    }
+    return kmsList;
+  }
+
+  /**
+   *
+   * Check whether the operator needs repartition based on reported stats
+   *
+   * @return true if repartition is required
+   * false if repartition is not required
+   */
+  private boolean isPartitionRequired(int opid, List<KafkaConsumer.KafkaMeterStats> kstats)
+  {
+
+    long t = System.currentTimeMillis();
+
+    if (t - lastCheckTime < repartitionCheckInterval) {
+      // return false if it's within repartitionCheckInterval since last time it check the stats
+      return false;
+    }
+
+    logger.debug("Use OffsetManager to update offsets");
+    updateOffsets(kstats);
+
+
+    if(repartitionInterval < 0){
+      // if repartition is disabled
+      return false;
+    }
+
+    if(t - lastRepartitionTime < repartitionInterval) {
+      // return false if it's still within repartitionInterval since last (re)partition
+      return false;
+    }
+
+
+    kafkaStatsHolder.put(opid, kstats);
+
+    if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) {
+      // skip checking if the operator hasn't collected all the stats from all the current partitions
+      return false;
+    }
+
+    try {
+
+      // monitor if new kafka partition added
+      {
+        Set<KafkaPartition> existingIds = new HashSet<KafkaPartition>();
+        for (PartitionInfo pio : currentPartitionInfo) {
+          existingIds.addAll(pio.kpids);
+        }
+
+        for (Map.Entry<String, List<PartitionMetadata>> en : KafkaMetadataUtil.getPartitionsForTopic(consumer.brokers, consumer.getTopic()).entrySet()) {
+          for (PartitionMetadata pm : en.getValue()) {
+            KafkaPartition pa = new KafkaPartition(en.getKey(), consumer.topic, pm.partitionId());
+            if(!existingIds.contains(pa)){
+              newWaitingPartition.add(pa);
+            }
+          }
+        }
+        if (newWaitingPartition.size() != 0) {
+          // found new kafka partition
+          lastRepartitionTime = t;
+          return true;
+        }
+      }
+
+      if (strategy == PartitionStrategy.ONE_TO_ONE) {
+        return false;
+      }
+
+      // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions
+      // and see if there is more optimal solution
+      // The decision is made by 2 constraint
+      // Hard constraint which is upper bound overall msgs/s or bytes/s
+      // Soft constraint which is more optimal solution
+
+      boolean b = breakHardConstraint(kstats) || breakSoftConstraint();
+      if (b) {
+        currentPartitionInfo.clear();
+        kafkaStatsHolder.clear();
+      }
+      return b;
+    } finally {
+      // update last  check time
+      lastCheckTime = System.currentTimeMillis();
+    }
+  }
+
+  /**
+   * Check to see if there is other more optimal(less partition) partition assignment based on current statistics
+   *
+   * @return True if all windowed stats indicate different partition size we need to adjust the partition.
+   */
+  private boolean breakSoftConstraint()
+  {
+    if (kafkaStatsHolder.size() != currentPartitionInfo.size()) {
+      return false;
+    }
+    int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size();
+    for (int j = 0; j < length; j++) {
+      Map<KafkaPartition, long[]> kPIntakeRate = new HashMap<KafkaPartition, long[]>();
+      for (Integer pid : kafkaStatsHolder.keySet()) {
+        if(kafkaStatsHolder.get(pid).size() <= j)
+          continue;
+        kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j)));
+      }
+      if (kPIntakeRate.size() == 0) {
+        return false;
+      }
+      List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate);
+      if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) {
+        return false;
+      }
+    }
+    // if all windowed stats indicate different partition size we need to adjust the partition
+    return true;
+  }
+
+  /**
+   * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s
+   *
+   * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s.
+   */
+  private boolean breakHardConstraint(List<KafkaConsumer.KafkaMeterStats> kmss)
+  {
+    // Only care about the KafkaMeterStats
+
+    // if there is no kafka meter stats at all, don't repartition
+    if (kmss == null || kmss.size() == 0) {
+      return false;
+    }
+    // if all the stats within the window have msgs/s above the upper bound threshold (hard limit)
+    boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
+    {
+      @Override
+      public boolean apply(KafkaConsumer.KafkaMeterStats kms)
+      {
+        // If there are more than 1 kafka partition and the total msg/s reach the limit
+        return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound;
+      }
+    });
+
+    // or all the stats within the window have bytes/s above the upper bound threshold (hard limit)
+    needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaConsumer.KafkaMeterStats>()
+    {
+      @Override
+      public boolean apply(KafkaConsumer.KafkaMeterStats kms)
+      {
+        //If there are more than 1 kafka partition and the total bytes/s reach the limit
+        return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound;
+      }
+    });
+
+    return needRP;
+
+  }
+
+  public static enum PartitionStrategy
+  {
+    /**
+     * Each operator partition connect to only one kafka partition
+     */
+    ONE_TO_ONE,
+    /**
+     * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
+     * For now it <b>only</b> support <b>simple kafka consumer</b>
+     */
+    ONE_TO_MANY,
+    /**
+     * 1 to N partition based on the heuristic function
+     * <b>NOT</b> implemented yet
+     * TODO implement this later
+     */
+    ONE_TO_MANY_HEURISTIC
+  }
+
+  static class PartitionInfo
+  {
+    Set<KafkaPartition> kpids;
+    long msgRateLeft;
+    long byteRateLeft;
+  }
+
+  public IdempotentStorageManager getIdempotentStorageManager()
+  {
+    return idempotentStorageManager;
+  }
+
+  public void setIdempotentStorageManager(IdempotentStorageManager idempotentStorageManager)
+  {
+    this.idempotentStorageManager = idempotentStorageManager;
+  }
+
+  public void setInitialPartitionCount(int partitionCount)
+  {
+    this.initialPartitionCount = partitionCount;
+  }
+
+  public int getInitialPartitionCount()
+  {
+    return initialPartitionCount;
+  }
+
+  public long getMsgRateUpperBound()
+  {
+    return msgRateUpperBound;
+  }
+
+  public void setMsgRateUpperBound(long msgRateUpperBound)
+  {
+    this.msgRateUpperBound = msgRateUpperBound;
+  }
+
+  public long getByteRateUpperBound()
+  {
+    return byteRateUpperBound;
+  }
+
+  public void setByteRateUpperBound(long byteRateUpperBound)
+  {
+    this.byteRateUpperBound = byteRateUpperBound;
+  }
+
+  public void setInitialOffset(String initialOffset)
+  {
+    this.consumer.initialOffset = initialOffset;
+  }
+
+  public void setOffsetManager(OffsetManager offsetManager)
+  {
+    this.offsetManager = offsetManager;
+  }
+
+  public void setRepartitionCheckInterval(long repartitionCheckInterval)
+  {
+    this.repartitionCheckInterval = repartitionCheckInterval;
+  }
+
+  public long getRepartitionCheckInterval()
+  {
+    return repartitionCheckInterval;
+  }
+
+  public void setRepartitionInterval(long repartitionInterval)
+  {
+    this.repartitionInterval = repartitionInterval;
+  }
+
+  public long getRepartitionInterval()
+  {
+    return repartitionInterval;
+  }
+
+  //@Pattern(regexp="ONE_TO_ONE|ONE_TO_MANY|ONE_TO_MANY_HEURISTIC", flags={Flag.CASE_INSENSITIVE})
+  public void setStrategy(String policy)
+  {
+    this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
+  }
+
+  public boolean isIdempotent()
+  {
+    return idempotent;
+  }
+
+  public void setIdempotent(boolean idempotent)
+  {
+    this.idempotent = idempotent;
+  }
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index 7e1b252..ecca9d3 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -96,7 +96,7 @@
     configProperties.putAll(prop);
 
     return new ProducerConfig(configProperties);
-  };
+  }
 
   public Producer<K, V> getProducer()
   {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java
deleted file mode 100644
index 6509b79..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaInputOperator.java
+++ /dev/null
@@ -1,662 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import kafka.javaapi.PartitionMetadata;
-
-import javax.validation.constraints.Min;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Sets;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.commons.lang3.StringUtils;
-
-import com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStats;
-
-import static com.datatorrent.contrib.kafka.KafkaConsumer.KafkaMeterStatsUtil.*;
-
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DefaultPartition;
-import com.datatorrent.api.Partitioner;
-import com.datatorrent.api.Stats.OperatorStats;
-import com.datatorrent.api.StatsListener;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-
-/**
- * This is a base implementation of a Kafka input operator, which consumes data from Kafka message bus.&nbsp;
- * It will be dynamically partitioned based on the upstream kafka partition.
- * <p>
- * <b>Partition Strategy:</b>
- * <p><b>1. ONE_TO_ONE partition</b> Each operator partition will consume from only one kafka partition </p>
- * <p><b>2. ONE_TO_MANY partition</b> Each operator partition consumer from multiple kafka partition with some hard ingestion rate limit</p>
- * <p><b>3. ONE_TO_MANY_HEURISTIC partition</b>(Not implemented yet) Each operator partition consumer from multiple kafka partition and partition number depends on heuristic function(real time bottle neck)</p>
- * <p><b>Note:</b> ONE_TO_MANY partition only support simple kafka consumer because
- * <p>  1) high-level consumer can only balance the number of brokers it consumes from rather than the actual load from each broker</p>
- * <p>  2) high-level consumer can not reset offset once it's committed so the tuples are not replayable </p>
- * <p></p>
- * <br>
- * <br>
- * <b>Basic Algorithm:</b>
- * <p>1.Pull the metadata(how many partitions) of the topic from brokerList of {@link KafkaConsumer}</p>
- * <p>2.cloneConsumer method is used to initialize the new {@link KafkaConsumer} instance for the new partition operator</p>
- * <p>3.cloneOperator method is used to initialize the new {@link AbstractPartitionableKafkaInputOperator} instance for the new partition operator</p>
- * <p>4.ONE_TO_MANY partition use first-fit decreasing algorithm(http://en.wikipedia.org/wiki/Bin_packing_problem) to minimize the partition operator
- * <br>
- * <br>
- * <b>Load balance:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer} <br>
- * <b>Kafka partition failover:</b> refer to {@link SimpleKafkaConsumer} and {@link HighlevelKafkaConsumer}
- * <br>
- * <br>
- * <b>Self adjust to Kafka partition change:</b>
- * <p><b>EACH</b> operator partition periodically check the leader broker(s) change which it consumes from and adjust connection without repartition</p>
- * <p><b>ONLY APPMASTER</b> operator periodically check overall kafka partition layout and add operator partition due to kafka partition add(no delete supported by kafka for now)</p>
- * <br>
- * <br>
- * </p>
- *
- * @displayName Abstract Partitionable Kafka Input
- * @category Messaging
- * @tags input operator
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public abstract class AbstractPartitionableKafkaInputOperator extends AbstractKafkaInputOperator<KafkaConsumer> implements Partitioner<AbstractPartitionableKafkaInputOperator>, StatsListener
-{
-
-  // By default the partition policy is 1:1
-  public PartitionStrategy strategy = PartitionStrategy.ONE_TO_ONE;
-
-  private transient OperatorContext context = null;
-
-  // default resource is unlimited in terms of msgs per second
-  private long msgRateUpperBound = Long.MAX_VALUE;
-
-  // default resource is unlimited in terms of bytes per second
-  private long byteRateUpperBound = Long.MAX_VALUE;
-
-  private static final Logger logger = LoggerFactory.getLogger(AbstractPartitionableKafkaInputOperator.class);
-
-  // Store the current partition topology
-  private transient List<PartitionInfo> currentPartitionInfo = new LinkedList<AbstractPartitionableKafkaInputOperator.PartitionInfo>();
-
-  // Store the current collected kafka consumer stats
-  private transient Map<Integer, List<KafkaMeterStats>> kafkaStatsHolder = new HashMap<Integer, List<KafkaConsumer.KafkaMeterStats>>();
-
-  private OffsetManager offsetManager = null;
-
-  // Minimal interval between 2 (re)partition actions
-  private long repartitionInterval = 30000L;
-
-  // Minimal interval between checking collected stats and decide whether it needs to repartition or not.
-  // And minimal interval between 2 offset updates
-  private long repartitionCheckInterval = 5000L;
-
-  private transient long lastCheckTime = 0L;
-
-  private transient long lastRepartitionTime = 0L;
-
-  private transient List<Integer> newWaitingPartition = new LinkedList<Integer>();
-
-  @Min(1)
-  private int initialPartitionCount = 1;
-
-  @Override
-  public void partitioned(Map<Integer, Partition<AbstractPartitionableKafkaInputOperator>> partitions)
-  {
-    // update the last repartition time
-    lastRepartitionTime = System.currentTimeMillis();
-  }
-
-  @Override
-  public Collection<Partition<AbstractPartitionableKafkaInputOperator>> definePartitions(Collection<Partition<AbstractPartitionableKafkaInputOperator>> partitions, PartitioningContext context)
-  {
-
-    // check if it's the initial partition
-    boolean isInitialParitition = partitions.iterator().next().getStats() == null;
-
-    // get partition metadata for topics.
-    // Whatever operator is using high-level or simple kafka consumer, the operator always create a temporary simple kafka consumer to get the metadata of the topic
-    // The initial value of brokerList of the KafkaConsumer is used to retrieve the topic metadata
-    List<PartitionMetadata> kafkaPartitionList = KafkaMetadataUtil.getPartitionsForTopic(getConsumer().getBrokerSet(), getConsumer().getTopic());
-
-    // Operator partitions
-    List<Partition<AbstractPartitionableKafkaInputOperator>> newPartitions = null;
-
-    // initialize the offset
-    Map<Integer, Long> initOffset = null;
-    if(isInitialParitition && offsetManager !=null){
-      initOffset = offsetManager.loadInitialOffsets();
-      logger.info("Initial offsets: {} ", "{ " + Joiner.on(", ").useForNull("").withKeyValueSeparator(": ").join(initOffset) + " }");
-    }
-
-    switch (strategy) {
-
-      // For the 1 to 1 mapping The framework will create number of operator partitions based on kafka topic partitions
-      // Each operator partition will consume from only one kafka partition
-      case ONE_TO_ONE:
-
-        if (isInitialParitition) {
-          lastRepartitionTime = System.currentTimeMillis();
-          logger.info("[ONE_TO_ONE]: Initializing partition(s)");
-
-          // initialize the number of operator partitions according to number of kafka partitions
-
-          newPartitions = new ArrayList<Partition<AbstractPartitionableKafkaInputOperator>>(kafkaPartitionList.size());
-          for (int i = 0; i < kafkaPartitionList.size(); i++) {
-            logger.info("[ONE_TO_ONE]: Create operator partition for kafka partition: " + kafkaPartitionList.get(i).partitionId() + ", topic: " + this.getConsumer().topic);
-            Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(cloneOperator());
-            PartitionMetadata pm = kafkaPartitionList.get(i);
-            KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(pm.partitionId()), initOffset);
-            p.getPartitionedInstance().setConsumer(newConsumerForPartition);
-            PartitionInfo pif = new PartitionInfo();
-            pif.kpids = Sets.newHashSet(pm.partitionId());
-            currentPartitionInfo.add(pif);
-            newPartitions.add(p);
-          }
-        }
-        else if (newWaitingPartition.size() != 0) {
-          // add partition for new kafka partition
-          for (int pid : newWaitingPartition) {
-            logger.info("[ONE_TO_ONE]: Add operator partition for kafka partition " + pid);
-            Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(cloneOperator());
-            KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(pid));
-            p.getPartitionedInstance().setConsumer(newConsumerForPartition);
-            PartitionInfo pif = new PartitionInfo();
-            pif.kpids = Sets.newHashSet(pid);
-            currentPartitionInfo.add(pif);
-            partitions.add(p);
-          }
-          newWaitingPartition.clear();
-          return partitions;
-
-        }
-        break;
-      // For the 1 to N mapping The initial partition number is defined by stream application
-      // Afterwards, the framework will dynamically adjust the partition and allocate consumers to as less operator partitions as it can
-      //  and guarantee the total intake rate for each operator partition is below some threshold
-      case ONE_TO_MANY:
-
-        if (getConsumer() instanceof HighlevelKafkaConsumer) {
-          throw new UnsupportedOperationException("[ONE_TO_MANY]: The high-level consumer is not supported for ONE_TO_MANY partition strategy.");
-        }
-
-        if (isInitialParitition) {
-          lastRepartitionTime = System.currentTimeMillis();
-          logger.info("[ONE_TO_MANY]: Initializing partition(s)");
-          int size = initialPartitionCount;
-          @SuppressWarnings("unchecked")
-          Set<Integer>[] pIds = new Set[size];
-          newPartitions = new ArrayList<Partition<AbstractPartitionableKafkaInputOperator>>(size);
-          for (int i = 0; i < kafkaPartitionList.size(); i++) {
-            PartitionMetadata pm = kafkaPartitionList.get(i);
-            if (pIds[i % size] == null) {
-              pIds[i % size] = new HashSet<Integer>();
-            }
-            pIds[i % size].add(pm.partitionId());
-          }
-          for (int i = 0; i < pIds.length; i++) {
-            logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(pIds[i], ", ") + ", topic: " + this.getConsumer().topic);
-            Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
-            KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(pIds[i], initOffset);
-            p.getPartitionedInstance().setConsumer(newConsumerForPartition);
-            newPartitions.add(p);
-            PartitionInfo pif = new PartitionInfo();
-            pif.kpids = pIds[i];
-            currentPartitionInfo.add(pif);
-          }
-
-        }
-        else if (newWaitingPartition.size() != 0) {
-
-          logger.info("[ONE_TO_MANY]: Add operator partition for kafka partition(s): " + StringUtils.join(newWaitingPartition, ", ") + ", topic: " + this.getConsumer().topic);
-          Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
-          KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(Sets.newHashSet(newWaitingPartition));
-          p.getPartitionedInstance().setConsumer(newConsumerForPartition);
-          partitions.add(p);
-          PartitionInfo pif = new PartitionInfo();
-          pif.kpids = Sets.newHashSet(newWaitingPartition);
-          currentPartitionInfo.add(pif);
-          newWaitingPartition.clear();
-          return partitions;
-        }
-        else {
-
-          logger.info("[ONE_TO_MANY]: Repartition the operator(s) under " + msgRateUpperBound + " msgs/s and " + byteRateUpperBound + " bytes/s hard limit");
-          // size of the list depends on the load and capacity of each operator
-          newPartitions = new LinkedList<Partition<AbstractPartitionableKafkaInputOperator>>();
-
-          // Use first-fit decreasing algorithm to minimize the container number and somewhat balance the partition
-          // try to balance the load and minimize the number of containers with each container's load under the threshold
-          // the partition based on the latest 1 minute moving average
-          Map<Integer, long[]> kPIntakeRate = new HashMap<Integer, long[]>();
-          // get the offset for all partitions of each consumer
-          Map<Integer, Long> offsetTrack = new HashMap<Integer, Long>();
-          for (Partition<AbstractPartitionableKafkaInputOperator> partition : partitions) {
-            List<OperatorStats> opss = partition.getStats().getLastWindowedStats();
-            if (opss == null || opss.size() == 0) {
-              continue;
-            }
-            offsetTrack.putAll(partition.getPartitionedInstance().consumer.getCurrentOffsets());
-            // Get the latest stats
-
-            OperatorStats stat = partition.getStats().getLastWindowedStats().get(partition.getStats().getLastWindowedStats().size() - 1);
-            if (stat.counters instanceof KafkaMeterStats) {
-              KafkaMeterStats kms = (KafkaMeterStats) stat.counters;
-              kPIntakeRate.putAll(get_1minMovingAvgParMap(kms));
-            }
-          }
-
-          List<PartitionInfo> partitionInfos = firstFitDecreasingAlgo(kPIntakeRate);
-
-          for (PartitionInfo r : partitionInfos) {
-            logger.info("[ONE_TO_MANY]: Create operator partition for kafka partition(s): " + StringUtils.join(r.kpids, ", ") + ", topic: " + this.getConsumer().topic);
-            Partition<AbstractPartitionableKafkaInputOperator> p = new DefaultPartition<AbstractPartitionableKafkaInputOperator>(_cloneOperator());
-            KafkaConsumer newConsumerForPartition = getConsumer().cloneConsumer(r.kpids, offsetTrack);
-            p.getPartitionedInstance().setConsumer(newConsumerForPartition);
-            newPartitions.add(p);
-          }
-
-          currentPartitionInfo.addAll(partitionInfos);
-        }
-
-        break;
-
-      case ONE_TO_MANY_HEURISTIC:
-        throw new UnsupportedOperationException("[ONE_TO_MANY_HEURISTIC]: Not implemented yet");
-      default:
-        break;
-    }
-
-    return newPartitions;
-  }
-
-  private List<PartitionInfo> firstFitDecreasingAlgo(final Map<Integer, long[]> kPIntakeRate)
-  {
-    // (Decreasing) Sort the map by msgs/s and bytes/s in descending order
-    List<Entry<Integer, long[]>> sortedMapEntry = new LinkedList<Entry<Integer, long[]>>(kPIntakeRate.entrySet());
-    Collections.sort(sortedMapEntry, new Comparator<Entry<Integer, long[]>>()
-    {
-      @Override
-      public int compare(Entry<Integer, long[]> firstEntry, Entry<Integer, long[]> secondEntry)
-      {
-        long[] firstPair = firstEntry.getValue();
-        long[] secondPair = secondEntry.getValue();
-        if (msgRateUpperBound == Long.MAX_VALUE || firstPair[0] == secondPair[0]) {
-          return (int) (secondPair[1] - firstPair[1]);
-        }
-        else {
-          return (int) (secondPair[0] - firstPair[0]);
-        }
-      }
-    });
-
-    // (First-fit) Look for first fit operator to assign the consumer
-    // Go over all the kafka partitions and look for the right operator to assign to
-    // Each record has a set of kafka partition ids and the resource left for that operator after assigned the consumers for those partitions
-    List<PartitionInfo> pif = new LinkedList<PartitionInfo>();
-  outer:
-    for (Entry<Integer, long[]> entry : sortedMapEntry) {
-      long[] resourceRequired = entry.getValue();
-      for (PartitionInfo r : pif) {
-        if (r.msgRateLeft > resourceRequired[0] && r.byteRateLeft > resourceRequired[1]) {
-          // found first fit operator partition that has enough resource for this consumer
-          // add consumer to the operator partition
-          r.kpids.add(entry.getKey());
-          // update the resource left in this partition
-          r.msgRateLeft -= r.msgRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[0];
-          r.byteRateLeft -= r.byteRateLeft == Long.MAX_VALUE ? 0 : resourceRequired[1];
-          continue outer;
-        }
-      }
-      // didn't find the existing "operator" to assign this consumer
-      PartitionInfo nr = new PartitionInfo();
-      nr.kpids = Sets.newHashSet(entry.getKey());
-      nr.msgRateLeft = msgRateUpperBound == Long.MAX_VALUE ? msgRateUpperBound : msgRateUpperBound - resourceRequired[0];
-      nr.byteRateLeft = byteRateUpperBound == Long.MAX_VALUE ? byteRateUpperBound : byteRateUpperBound - resourceRequired[1];
-      pif.add(nr);
-    }
-
-    return pif;
-  }
-
-  @Override
-  public Response processStats(BatchedOperatorStats stats)
-  {
-
-    Response resp = new Response();
-    List<KafkaMeterStats> kstats = extractKafkaStats(stats);
-    resp.repartitionRequired = needPartition(stats.getOperatorId(), kstats);
-    return resp;
-  }
-
-  private void updateOffsets(List<KafkaMeterStats> kstats)
-  {
-    //In every partition check interval, call offsetmanager to update the offsets
-    if (offsetManager != null) {
-      offsetManager.updateOffsets(getOffsetsForPartitions(kstats));
-    }
-  }
-
-  private List<KafkaMeterStats> extractKafkaStats(BatchedOperatorStats stats)
-  {
-  //preprocess the stats
-    List<KafkaMeterStats> kmsList = new LinkedList<KafkaConsumer.KafkaMeterStats>();
-    for (OperatorStats os : stats.getLastWindowedStats()) {
-      if (os != null && os.counters instanceof KafkaMeterStats) {
-        kmsList.add((KafkaMeterStats) os.counters);
-      }
-    }
-    return kmsList;
-  }
-
-  /**
-   *
-   * Check whether the operator needs repartition based on reported stats
-   *
-   * @param stats
-   * @return true if repartition is required
-   * false if repartition is not required
-   */
-  private boolean needPartition(int opid, List<KafkaMeterStats> kstats)
-  {
-
-    long t = System.currentTimeMillis();
-
-    if (t - lastCheckTime < repartitionCheckInterval) {
-      // return false if it's within repartitionCheckInterval since last time it check the stats
-      return false;
-    }
-
-    logger.debug("Use OffsetManager to update offsets");
-    updateOffsets(kstats);
-
-
-    if(repartitionInterval < 0){
-      // if repartition is disabled
-      return false;
-    }
-
-    if(t - lastRepartitionTime < repartitionInterval) {
-      // return false if it's still within repartitionInterval since last (re)partition
-      return false;
-    }
-
-
-    kafkaStatsHolder.put(opid, kstats);
-
-    if (kafkaStatsHolder.size() != currentPartitionInfo.size() || currentPartitionInfo.size() == 0) {
-      // skip checking if the operator hasn't collected all the stats from all the current partitions
-      return false;
-    }
-
-    try {
-
-      // monitor if new kafka partition added
-      {
-        Set<Integer> existingIds = new HashSet<Integer>();
-        for (PartitionInfo pio : currentPartitionInfo) {
-          existingIds.addAll(pio.kpids);
-        }
-
-        for (PartitionMetadata metadata : KafkaMetadataUtil.getPartitionsForTopic(consumer.brokerSet, consumer.getTopic())) {
-          if (!existingIds.contains(metadata.partitionId())) {
-            newWaitingPartition.add(metadata.partitionId());
-          }
-        }
-        if (newWaitingPartition.size() != 0) {
-          // found new kafka partition
-          lastRepartitionTime = t;
-          return true;
-        }
-      }
-
-      if (strategy == PartitionStrategy.ONE_TO_ONE) {
-        return false;
-      }
-
-      // This is expensive part and only every repartitionCheckInterval it will check existing the overall partitions
-      // and see if there is more optimal solution
-      // The decision is made by 2 constraint
-      // Hard constraint which is upper bound overall msgs/s or bytes/s
-      // Soft constraint which is more optimal solution
-
-      boolean b = breakHardConstraint(kstats) || breakSoftConstraint();
-      if (b) {
-        currentPartitionInfo.clear();
-        kafkaStatsHolder.clear();
-      }
-      return b;
-    } finally {
-      // update last  check time
-      lastCheckTime = System.currentTimeMillis();
-    }
-  }
-
-  /**
-   * Check to see if there is other more optimal(less partition) partition assignment based on current statistics
-   *
-   * @return True if all windowed stats indicate different partition size we need to adjust the partition.
-   */
-  private boolean breakSoftConstraint()
-  {
-    if (kafkaStatsHolder.size() != currentPartitionInfo.size()) {
-      return false;
-    }
-    int length = kafkaStatsHolder.get(kafkaStatsHolder.keySet().iterator().next()).size();
-    for (int j = 0; j < length; j++) {
-      Map<Integer, long[]> kPIntakeRate = new HashMap<Integer, long[]>();
-      for (Integer pid : kafkaStatsHolder.keySet()) {
-        kPIntakeRate.putAll(get_1minMovingAvgParMap(kafkaStatsHolder.get(pid).get(j)));
-      }
-      if (kPIntakeRate.size() == 0) {
-        return false;
-      }
-      List<PartitionInfo> partitionInfo = firstFitDecreasingAlgo(kPIntakeRate);
-      if (partitionInfo.size() == 0 || partitionInfo.size() == currentPartitionInfo.size()) {
-        return false;
-      }
-    }
-    // if all windowed stats indicate different partition size we need to adjust the partition
-    return true;
-  }
-
-  /**
-   * Check if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s
-   *
-   * @param kmss
-   * @return True if all the statistics within the windows break the upper bound hard limit in msgs/s or bytes/s.
-   */
-  private boolean breakHardConstraint(List<KafkaMeterStats> kmss)
-  {
-    // Only care about the KafkaMeterStats
-
-    // if there is no kafka meter stats at all, don't repartition
-    if (kmss == null || kmss.size() == 0) {
-      return false;
-    }
-    // if all the stats within the window have msgs/s above the upper bound threshold (hard limit)
-    boolean needRP = Iterators.all(kmss.iterator(), new Predicate<KafkaMeterStats>()
-    {
-      @Override
-      public boolean apply(KafkaMeterStats kms)
-      {
-        // If there are more than 1 kafka partition and the total msg/s reach the limit
-        return kms.partitionStats.size() > 1 && kms.totalMsgPerSec > msgRateUpperBound;
-      }
-    });
-
-    // or all the stats within the window have bytes/s above the upper bound threshold (hard limit)
-    needRP = needRP || Iterators.all(kmss.iterator(), new Predicate<KafkaMeterStats>()
-    {
-      @Override
-      public boolean apply(KafkaMeterStats kms)
-      {
-        //If there are more than 1 kafka partition and the total bytes/s reach the limit
-        return kms.partitionStats.size() > 1 && kms.totalBytesPerSec > byteRateUpperBound;
-      }
-    });
-
-    return needRP;
-
-  }
-
-  private final AbstractPartitionableKafkaInputOperator _cloneOperator()
-  {
-    AbstractPartitionableKafkaInputOperator newOp = cloneOperator();
-    newOp.msgRateUpperBound = this.msgRateUpperBound;
-    newOp.byteRateUpperBound = this.byteRateUpperBound;
-    newOp.strategy = this.strategy;
-    newOp.setMaxTuplesPerWindow(getMaxTuplesPerWindow());
-    return newOp;
-  }
-
-  /**
-   * Implement this method to initialize new operator instance for new partition.
-   * Please carefully include all the properties you want to keep in new instance
-   *
-   * @return
-   */
-  protected abstract AbstractPartitionableKafkaInputOperator cloneOperator();
-
-  @Override
-  public void setup(OperatorContext context)
-  {
-    super.setup(context);
-    this.context = context;
-  }
-
-  @Override
-  public void endWindow()
-  {
-
-    super.endWindow();
-
-    if (strategy == PartitionStrategy.ONE_TO_MANY) {
-      //send the stats to AppMaster and let the AppMaster decide if it wants to repartition
-      context.setCounters(getConsumer().getConsumerStats());
-    }
-  }
-
-  public static enum PartitionStrategy
-  {
-    /**
-     * Each operator partition connect to only one kafka partition
-     */
-    ONE_TO_ONE,
-    /**
-     * Each operator consumes from several kafka partitions with overall input rate under some certain hard limit in msgs/s or bytes/s
-     * For now it <b>only</b> support <b>simple kafka consumer</b>
-     */
-    ONE_TO_MANY,
-    /**
-     * 1 to N partition based on the heuristic function
-     * <b>NOT</b> implemented yet
-     * TODO implement this later
-     */
-    ONE_TO_MANY_HEURISTIC
-  }
-
-  public void setInitialPartitionCount(int partitionCount)
-  {
-    this.initialPartitionCount = partitionCount;
-  }
-
-  public int getInitialPartitionCount()
-  {
-    return initialPartitionCount;
-  }
-
-  public long getMsgRateUpperBound()
-  {
-    return msgRateUpperBound;
-  }
-
-  public void setMsgRateUpperBound(long msgRateUpperBound)
-  {
-    this.msgRateUpperBound = msgRateUpperBound;
-  }
-
-  public long getByteRateUpperBound()
-  {
-    return byteRateUpperBound;
-  }
-
-  public void setByteRateUpperBound(long byteRateUpperBound)
-  {
-    this.byteRateUpperBound = byteRateUpperBound;
-  }
-
-  public void setInitialOffset(String initialOffset)
-  {
-    this.consumer.initialOffset = initialOffset;
-  }
-
-  public void setOffsetManager(OffsetManager offsetManager)
-  {
-    this.offsetManager = offsetManager;
-  }
-
-  public void setRepartitionCheckInterval(long repartitionCheckInterval)
-  {
-    this.repartitionCheckInterval = repartitionCheckInterval;
-  }
-
-  public long getRepartitionCheckInterval()
-  {
-    return repartitionCheckInterval;
-  }
-
-  public void setRepartitionInterval(long repartitionInterval)
-  {
-    this.repartitionInterval = repartitionInterval;
-  }
-
-  public long getRepartitionInterval()
-  {
-    return repartitionInterval;
-  }
-
-  //@Pattern(regexp="ONE_TO_ONE|ONE_TO_MANY|ONE_TO_MANY_HEURISTIC", flags={Flag.CASE_INSENSITIVE})
-  public void setStrategy(String policy)
-  {
-    this.strategy = PartitionStrategy.valueOf(policy.toUpperCase());
-  }
-
-  static class PartitionInfo
-  {
-    Set<Integer> kpids;
-    long msgRateLeft;
-    long byteRateLeft;
-  }
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java
deleted file mode 100644
index cc7914d..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractPartitionableKafkaSinglePortInputOperator.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import kafka.message.Message;
-
-/**
- * This is the base implementation of Kafka input operator, with a single output port, which consumes data from Kafka message bus.&nbsp;
- * It will be dynamically partitioned based on the upstream Kafka partition.&nbsp;
- * Subclasses should implement the methods which convert Kafka messages to tuples.
- * <p></p>
- * @displayName Abstract Partitionable Kafka Single Port Input
- * @category Messaging
- * @tags input operator
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public abstract class AbstractPartitionableKafkaSinglePortInputOperator<T> extends AbstractPartitionableKafkaInputOperator
-{
-  /**
-   * This output port emits tuples extracted from Kafka messages.
-   */
-  public final transient DefaultOutputPort<T> outputPort = new DefaultOutputPort<T>();
-
-  /**
-   * Any concrete class derived from AbstractPartitionableKafkaSinglePortInputOperator has to implement this method
-   * so that it knows what type of message it is going to send to Malhar.
-   * It converts a ByteBuffer message into a Tuple. A Tuple can be of any type (derived from Java Object) that
-   * operator user intends to.
-   *
-   * @param msg
-   */
-  public abstract T getTuple(Message msg);
-
-  /**
-   * Implement abstract method.
-   */
-  @Override
-  public void emitTuple(Message msg)
-  {
-    outputPort.emit(getTuple(msg));
-  }
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index 6e2092f..d84a145 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -18,166 +18,215 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import javax.validation.constraints.Min;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Joiner;
+
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
+import kafka.javaapi.PartitionMetadata;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.Message;
 import kafka.message.MessageAndMetadata;
 
 /**
- * High level kafka consumer adapter used for kafka input operator
- * Properties:<br>
+ * High level kafka consumer adapter used for kafka input operator Properties:<br>
  * <b>consumerConfig</b>: Used for create the high-level kafka consumer<br>
  * <b>numStream</b>: num of threads to consume the topic in parallel <br>
- * <li> (-1): create #partition thread and consume the topic in parallel threads</li>
- * <br>
+ * <li>(-1): create #partition thread and consume the topic in parallel threads</li> <br>
  * <br>
  *
  * Load balance: <br>
- * Build-in kafka load balancing strategy, Consumers with different consumer.id and same group.id will distribute the reads from different partition<br>
- * There are at most #partition per topic could consuming in parallel
- * For more information see {@link http://kafka.apache.org/documentation.html#distributionimpl} <br>
- * <br><br>
+ * Build-in kafka load balancing strategy, Consumers with different consumer.id and same group.id will distribute the
+ * reads from different partition<br>
+ * There are at most #partition per topic could consuming in parallel For more information see {@link http
+ * ://kafka.apache.org/documentation.html#distributionimpl} <br>
+ * <br>
+ * <br>
  * Kafka broker failover: <br>
  * Build-in failover strategy, the consumer will pickup the next available synchronized broker to consume data <br>
- * For more information see {@link http://kafka.apache.org/documentation.html#distributionimpl} <br>
+ * For more information see {@link http ://kafka.apache.org/documentation.html#distributionimpl} <br>
  *
  * @since 0.9.0
  */
 public class HighlevelKafkaConsumer extends KafkaConsumer
 {
   private static final Logger logger = LoggerFactory.getLogger(HighlevelKafkaConsumer.class);
-  
+
   private Properties consumerConfig = null;
 
-  private transient ConsumerConnector standardConsumer = null;
-  
-  private transient ExecutorService consumerThreadExecutor = null;
-  
   /**
-   * -1   Dynamically create number of stream according to the partitions
-   * < #{kafka partition} each stream could receive any message from any partition, order is not guaranteed among the partitions
-   * > #{kafka partition} each stream consume message from one partition, some stream might not get any data
+   * Consumer client for topic on each cluster
    */
-  @Min(value = -1)
-  private int numStream = 1;
-  
+  private transient Map<String, ConsumerConnector> standardConsumer = null;
+
+  private transient ExecutorService consumerThreadExecutor = null;
+
+  /**
+   * number of stream for topic on each cluster null/empty: create same # streams to # partitions of the topic on each
+   * cluster
+   */
+  private Map<String, Integer> numStream;
+
   public HighlevelKafkaConsumer()
   {
+    numStream = new HashMap<String, Integer>();
   }
-  
+
   public HighlevelKafkaConsumer(Properties consumerConfig)
   {
     super();
     this.consumerConfig = consumerConfig;
   }
-  
+
   @Override
   public void create()
   {
     super.create();
-    // This is important to let kafka know how to distribute the reads among different consumers in same consumer group
-    // Don't reuse any id for recovery to avoid rebalancing error because there is some delay for zookeeper to 
-    // find out the old consumer is dead and delete the entry even new consumer is back online
+    if (standardConsumer == null) {
+      standardConsumer = new HashMap<String, ConsumerConnector>();
+    }
+
+    // This is important to let kafka know how to distribute the reads among
+    // different consumers in same consumer group
+    // Don't reuse any id for recovery to avoid rebalancing error because
+    // there is some delay for zookeeper to
+    // find out the old consumer is dead and delete the entry even new
+    // consumer is back online
     consumerConfig.put("consumer.id", "consumer" + System.currentTimeMillis());
-    if(initialOffset.equalsIgnoreCase("earliest")){
+    if (initialOffset.equalsIgnoreCase("earliest")) {
       consumerConfig.put("auto.offset.reset", "smallest");
     } else {
       consumerConfig.put("auto.offset.reset", "largest");
     }
-    standardConsumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerConfig));
+
   }
 
   @Override
   public void start()
   {
     super.start();
+    //Share other properties among all connectors but set zookeepers respectively cause different cluster would use different zookeepers
+    for (String cluster : zookeeper.keySet()) {
+      // create high level consumer for every cluster
+      Properties config = new Properties();
+      config.putAll(consumerConfig);
+      config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeper.get(cluster)));
+      // create consumer connector will start a daemon thread to monitor the metadata change 
+      // we want to start this thread until the operator is activated 
+      standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config)));
+    }
+    
     Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
-    int realNumStream = numStream;
-    if (numStream == -1) {
-      realNumStream = KafkaMetadataUtil.getPartitionsForTopic(getBrokerSet(), getTopic()).size();
-    }
-    topicCountMap.put(topic, new Integer(realNumStream));
-    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = standardConsumer.createMessageStreams(topicCountMap);
 
-    // start $numStream anonymous threads to consume the data
-    consumerThreadExecutor = Executors.newFixedThreadPool(realNumStream);
-    for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
-      consumerThreadExecutor.submit(new Runnable() {
-        public void run()
-        {
-          ConsumerIterator<byte[], byte[]> itr = stream.iterator();
-          logger.debug("Thread " + Thread.currentThread().getName() + " start consuming message...");
-          while (itr.hasNext() && isAlive) {
-            MessageAndMetadata<byte[], byte[]> mam = itr.next();
-            try {
-              putMessage(mam.partition(), new Message(mam.message()));
-            } catch (InterruptedException e) {
-              logger.error("Message Enqueue has been interrupted", e);
-            }
-          }
-          logger.debug("Thread " + Thread.currentThread().getName() + " stop consuming message...");
-        }
-      });
+    if (numStream == null || numStream.size() == 0) {
+      if (numStream == null) {
+        numStream = new HashMap<String, Integer>();
+      }
+      // get metadata from kafka and initialize streams accordingly
+      for (Entry<String, List<PartitionMetadata>> e : KafkaMetadataUtil.getPartitionsForTopic(brokers, topic).entrySet()) {
+        numStream.put(e.getKey(), e.getValue().size());
+      }
     }
+
+    int totalNumStream = 0;
+    for (int delta : numStream.values()) {
+      totalNumStream += delta;
+    }
+    // start $totalNumStream anonymous threads to consume the data from all clusters
+    if (totalNumStream <= 0) {
+      logger.warn("No more job needed to consume data ");
+      return;
+    }
+    consumerThreadExecutor = Executors.newFixedThreadPool(totalNumStream);
+
+    for (final Entry<String, Integer> e : numStream.entrySet()) {
+
+      int realNumStream = e.getValue();
+      topicCountMap.put(topic, new Integer(realNumStream));
+      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = standardConsumer.get(e.getKey()).createMessageStreams(topicCountMap);
+
+      for (final KafkaStream<byte[], byte[]> stream : consumerMap.get(topic)) {
+        consumerThreadExecutor.submit(new Runnable() {
+
+          KafkaPartition kp = new KafkaPartition(e.getKey(), topic, -1);
+
+          public void run()
+          {
+            ConsumerIterator<byte[], byte[]> itr = stream.iterator();
+            logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
+            while (itr.hasNext() && isAlive) {
+              MessageAndMetadata<byte[], byte[]> mam = itr.next();
+              try {
+                kp.setPartitionId(mam.partition());
+                putMessage(kp, new Message(mam.message()), mam.offset());
+              } catch (InterruptedException e) {
+                logger.error("Message Enqueue has been interrupted", e);
+              }
+            }
+            logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
+          }
+        });
+      }
+
+    }
+
   }
 
   @Override
   public void close()
   {
-    if(standardConsumer!=null)
-      standardConsumer.shutdown();
-    if(consumerThreadExecutor!=null){
+    if (standardConsumer != null && standardConsumer.values() != null) {
+      for (ConsumerConnector consumerConnector : standardConsumer.values()) {
+        consumerConnector.shutdown();
+      }
+    }
+    if (consumerThreadExecutor != null) {
       consumerThreadExecutor.shutdown();
     }
   }
 
-  
   public void setConsumerConfig(Properties consumerConfig)
   {
     this.consumerConfig = consumerConfig;
   }
 
   @Override
-  protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds)
+  protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
   {
-    return cloneConsumer(partitionIds, null);
-  }
-  
-  @Override
-  protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset)
-  {
-    Properties newProp = new Properties();
-    // Copy most properties from the template consumer. For example the "group.id" should be set to same value 
-    newProp.putAll(consumerConfig);
-    HighlevelKafkaConsumer newConsumer = new HighlevelKafkaConsumer(newProp);
-    newConsumer.setBrokerSet(this.brokerSet);
-    newConsumer.setTopic(this.topic);
-    newConsumer.numStream = partitionIds.size();
-    newConsumer.initialOffset = initialOffset;
-    return newConsumer;
+    this.numStream = new HashMap<String, Integer>();
+    for (KafkaPartition kafkaPartition : partitionIds) {
+      if (this.numStream.get(kafkaPartition.getClusterId()) == null) {
+        this.numStream.put(kafkaPartition.getClusterId(), 0);
+      }
+      this.numStream.put(kafkaPartition.getClusterId(), this.numStream.get(kafkaPartition.getClusterId()) + 1);
+    }
   }
 
   @Override
   protected void commitOffset()
   {
-    // commit the offsets at checkpoint so that high-level consumer don't have to receive too many duplicate messages
-    standardConsumer.commitOffsets();
+    // commit the offsets at checkpoint so that high-level consumer don't
+    // have to receive too many duplicate messages
+    if (standardConsumer != null && standardConsumer.values() != null) {
+      for (ConsumerConnector consumerConnector : standardConsumer.values()) {
+        consumerConnector.commitOffsets();
+      }
+    }
   }
 
   @Override
-  protected Map<Integer, Long> getCurrentOffsets()
+  protected Map<KafkaPartition, Long> getCurrentOffsets()
   {
     // offset is not useful for high-level kafka consumer
-    // TODO 
     throw new UnsupportedOperationException("Offset request is currently not supported for high-level consumer");
   }
   
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index bf4e672..21615ef 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -28,6 +28,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+
 import kafka.message.Message;
 
 import javax.validation.constraints.NotNull;
@@ -38,7 +39,11 @@
 import org.apache.commons.lang3.StringUtils;
 
 import com.datatorrent.api.Context;
+import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.SetMultimap;
 
 /**
  * Base Kafka Consumer class used by kafka input operator
@@ -62,17 +67,17 @@
     this.topic = topic;
   }
 
-  public KafkaConsumer(Set<String> brokerSet, String topic)
+  public KafkaConsumer(SetMultimap<String, String> zks, String topic)
   {
     this.topic = topic;
-    this.brokerSet = brokerSet;
+    this.zookeeper = zks;
   }
 
   private int cacheSize = 1024;
 
   protected transient boolean isAlive = false;
 
-  private transient ArrayBlockingQueue<Message> holdingBuffer;
+  private transient ArrayBlockingQueue<KafkaMessage> holdingBuffer;
 
   /**
    * The topic that this consumer consumes
@@ -81,12 +86,14 @@
   protected String topic = "default_topic";
 
   /**
-   * A broker list to retrieve the metadata for the consumer
-   * This property could be null
-   * But it's mandatory for dynamic partition and fail-over
+   * A zookeeper map keyed by cluster id
+   * It's mandatory field
    */
   @NotNull
-  protected Set<String> brokerSet;
+  @Bind(JavaSerializer.class)
+  protected SetMultimap<String, String> zookeeper;
+  
+  protected transient SetMultimap<String, String> brokers;
 
 
   /**
@@ -108,16 +115,32 @@
    * This method is called in setup method of the operator
    */
   public void create(){
-    holdingBuffer = new ArrayBlockingQueue<Message>(cacheSize);
-  };
+    initBrokers();
+    holdingBuffer = new ArrayBlockingQueue<KafkaMessage>(cacheSize);
+  }
+  
+  public void initBrokers()
+  {
+    if(brokers!=null){
+      return ;
+    }
+    if(zookeeper!=null){
+      brokers = HashMultimap.create();
+      for (String clusterId: zookeeper.keySet()) {
+        brokers.putAll(clusterId, KafkaMetadataUtil.getBrokers(zookeeper.get(clusterId)));
+      }
+    }
+    
+  }
 
   /**
    * This method is called in the activate method of the operator
    */
-  public void start(){
+  public void start()
+  {
     isAlive = true;
     statsSnapShot.start();
-  };
+  }
 
   /**
    * The method is called in the deactivate method of the operator
@@ -157,7 +180,7 @@
     return topic;
   }
 
-  public Message pollMessage()
+  public KafkaMessage pollMessage()
   {
     return holdingBuffer.poll();
   }
@@ -167,14 +190,14 @@
     return holdingBuffer.size();
   }
 
-  public void setBrokerSet(Set<String> brokerSet)
+  public void setZookeeper(SetMultimap<String, String> zks)
   {
-    this.brokerSet = brokerSet;
+    this.zookeeper = zks;
   }
 
-  public Set<String> getBrokerSet()
+  public SetMultimap<String, String> getZookeeper()
   {
-    return brokerSet;
+    return zookeeper;
   }
 
   public void setInitialOffset(String initialOffset)
@@ -198,20 +221,15 @@
   }
 
 
-  final protected void putMessage(int partition, Message msg) throws InterruptedException{
+  final protected void putMessage(KafkaPartition partition, Message msg, long offset) throws InterruptedException{
     // block from receiving more message
-    holdingBuffer.put(msg);
+    holdingBuffer.put(new KafkaMessage(partition, msg, offset));
     statsSnapShot.mark(partition, msg.payloadSize());
-  };
-
-
-  protected abstract KafkaConsumer cloneConsumer(Set<Integer> partitionIds);
-
-  protected abstract KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset);
+  }
 
   protected abstract void commitOffset();
 
-  protected abstract Map<Integer, Long> getCurrentOffsets();
+  protected abstract Map<KafkaPartition, Long> getCurrentOffsets();
 
   public KafkaMeterStats getConsumerStats()
   {
@@ -219,6 +237,7 @@
     return stats;
   }
 
+  protected abstract void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset);
   /**
    * Counter class which gives the statistic value from the consumer
    */
@@ -230,7 +249,7 @@
     /**
      * A compact partition counter. The data collected for each partition is 4bytes brokerId + 1byte connected + 8bytes msg/s + 8bytes bytes/s + 8bytes offset
      */
-    public ConcurrentHashMap<Integer, PartitionStats> partitionStats = new ConcurrentHashMap<Integer, PartitionStats>();
+    public ConcurrentHashMap<KafkaPartition, PartitionStats> partitionStats = new ConcurrentHashMap<KafkaPartition, PartitionStats>();
 
 
     /**
@@ -246,9 +265,9 @@
 
     }
 
-    public void set_1minMovingAvgPerPartition(int pid, long[] _1minAvgPar)
+    public void set_1minMovingAvgPerPartition(KafkaPartition kp, long[] _1minAvgPar)
     {
-      PartitionStats ps = putPartitionStatsIfNotPresent(pid);
+      PartitionStats ps = putPartitionStatsIfNotPresent(kp);
       ps.msgsPerSec = _1minAvgPar[0];
       ps.bytesPerSec = _1minAvgPar[1];
     }
@@ -259,8 +278,8 @@
       totalBytesPerSec = _1minAvg[1];
     }
 
-    public void updateOffsets(Map<Integer, Long> offsets){
-      for (Entry<Integer, Long> os : offsets.entrySet()) {
+    public void updateOffsets(Map<KafkaPartition, Long> offsets){
+      for (Entry<KafkaPartition, Long> os : offsets.entrySet()) {
         PartitionStats ps = putPartitionStatsIfNotPresent(os.getKey());
         ps.offset = os.getValue();
       }
@@ -277,40 +296,55 @@
       return r;
     }
 
-    public void updatePartitionStats(int partitionId,int brokerId, String host)
+    public void updatePartitionStats(KafkaPartition kp,int brokerId, String host)
     {
-      PartitionStats ps = putPartitionStatsIfNotPresent(partitionId);
+      PartitionStats ps = putPartitionStatsIfNotPresent(kp);
       ps.brokerHost = host;
       ps.brokerId = brokerId;
     }
+    
+    private synchronized PartitionStats putPartitionStatsIfNotPresent(KafkaPartition kp){
+      PartitionStats ps = partitionStats.get(kp);
 
-    private synchronized PartitionStats putPartitionStatsIfNotPresent(int pid){
-      PartitionStats ps = partitionStats.get(pid);
       if (ps == null) {
         ps = new PartitionStats();
-        partitionStats.put(pid, ps);
+        partitionStats.put(kp, ps);
       }
       return ps;
     }
   }
 
+  public static class KafkaMessage
+  {
+    KafkaPartition kafkaPart;
+    Message msg;
+    long offSet;
+    public KafkaMessage(KafkaPartition kafkaPart, Message msg, long offset)
+    {
+      this.kafkaPart = kafkaPart;
+      this.msg = msg;
+      this.offSet = offset;
+    }
+
+  }
+
   public static class KafkaMeterStatsUtil {
 
-    public static Map<Integer, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats)
+    public static Map<KafkaPartition, Long> getOffsetsForPartitions(List<KafkaMeterStats> kafkaMeterStats)
     {
-      Map<Integer, Long> result = Maps.newHashMap();
+      Map<KafkaPartition, Long> result = Maps.newHashMap();
       for (KafkaMeterStats kms : kafkaMeterStats) {
-        for (Entry<Integer, PartitionStats> item : kms.partitionStats.entrySet()) {
+        for (Entry<KafkaPartition, PartitionStats> item : kms.partitionStats.entrySet()) {
           result.put(item.getKey(), item.getValue().offset);
         }
       }
       return result;
     }
 
-    public static Map<Integer, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats)
+    public static Map<KafkaPartition, long[]> get_1minMovingAvgParMap(KafkaMeterStats kafkaMeterStats)
     {
-      Map<Integer, long[]> result = Maps.newHashMap();
-      for (Entry<Integer, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
+      Map<KafkaPartition, long[]> result = Maps.newHashMap();
+      for (Entry<KafkaPartition, PartitionStats> item : kafkaMeterStats.partitionStats.entrySet()) {
         result.put(item.getKey(), new long[]{item.getValue().msgsPerSec, item.getValue().bytesPerSec});
       }
       return result;
@@ -374,12 +408,12 @@
     /**
      * 1 min total msg number for each partition
      */
-    private final Map<Integer, long[]> _1_min_msg_sum_par = new HashMap<Integer, long[]>();
+    private final Map<KafkaPartition, long[]> _1_min_msg_sum_par = new HashMap<KafkaPartition, long[]>();
 
     /**
      * 1 min total byte number for each partition
      */
-    private final Map<Integer, long[]> _1_min_byte_sum_par = new HashMap<Integer, long[]>();
+    private final Map<KafkaPartition, long[]> _1_min_byte_sum_par = new HashMap<KafkaPartition, long[]>();
 
     private static int cursor = 0;
 
@@ -404,7 +438,7 @@
       bytesSec[60] -= bytesSec[cursor];
       msgSec[cursor] = 0;
       bytesSec[cursor] = 0;
-      for (Entry<Integer, long[]> item : _1_min_msg_sum_par.entrySet()) {
+      for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
         long[] msgv = item.getValue();
         long[] bytesv = _1_min_byte_sum_par.get(item.getKey());
         msgv[60] -= msgv[cursor];
@@ -437,7 +471,7 @@
       }
     }
 
-    public synchronized void mark(int partition, long bytes){
+    public synchronized void mark(KafkaPartition partition, long bytes){
       msgSec[cursor]++;
       msgSec[60]++;
       bytesSec[cursor] += bytes;
@@ -454,11 +488,11 @@
       msgv[60]++;
       bytev[cursor] += bytes;
       bytev[60] += bytes;
-    };
+    }
 
     public synchronized void setupStats(KafkaMeterStats stat){
       long[] _1minAvg = {msgSec[60]/last, bytesSec[60]/last};
-      for (Entry<Integer, long[]> item : _1_min_msg_sum_par.entrySet()) {
+      for (Entry<KafkaPartition, long[]> item : _1_min_msg_sum_par.entrySet()) {
         long[] msgv =item.getValue();
         long[] bytev = _1_min_byte_sum_par.get(item.getKey());
         long[] _1minAvgPar = {msgv[60]/last, bytev[60]/last};
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index 35b6f2a..960ad3b 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -16,14 +16,27 @@
 package com.datatorrent.contrib.kafka;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import scala.collection.JavaConversions;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Maps.EntryTransformer;
+import com.google.common.collect.SetMultimap;
+
 import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
 import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetRequest;
 import kafka.javaapi.OffsetResponse;
@@ -31,6 +44,8 @@
 import kafka.javaapi.TopicMetadata;
 import kafka.javaapi.TopicMetadataResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
 
 /**
  * A util class used to retrieve all the metadatas for partitions/topics
@@ -49,7 +64,7 @@
   private static Logger logger = LoggerFactory.getLogger(KafkaMetadataUtil.class);
 
   // A temporary client used to retrieve the metadata of topic/partition etc
-  private static final String mdClientId = "Kafka_Broker_Lookup_Client";
+  private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
 
   private static final int timeout=10000;
 
@@ -57,7 +72,7 @@
   private static final int bufferSize = 128 * 1024;
 
   /**
-   * @param brokerList
+   * @param brokerList brokers in same cluster
    * @param topic
    * @return Get the partition metadata list for the specific topic via the brokerList <br>
    * null if topic is not found
@@ -70,6 +85,34 @@
     }
     return tmd.partitionsMetadata();
   }
+  
+  /**
+   * @param brokers in multiple clusters, keyed by cluster id
+   * @param topic
+   * @return Get the partition metadata list for the specific topic via the brokers
+   * null if topic is not found
+   */
+  public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic)
+  {
+    return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){
+      @Override
+      public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
+      {
+        return getPartitionsForTopic(new HashSet<String>(bs), topic);
+      }});
+  }
+  
+  
+  public static Set<String> getBrokers(Set<String> zkHost){
+    
+    ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+    Set<String> brokerHosts = new HashSet<String>();
+    for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
+      brokerHosts.add(b.getConnectionString());
+    }
+    zkclient.close();
+    return brokerHosts;
+  }
 
 
   /**
@@ -125,7 +168,7 @@
         } catch (NumberFormatException e) {
           throw new IllegalArgumentException("Wrong format for broker url, should be \"broker1:port1\"");
         } catch (Exception e) {
-          logger.error("Highly possible some broker(s) for topic {} are dead", topic, e);
+          logger.warn("Broker {} is unavailable or in bad state!", broker);
           // skip and try next broker
         }
       }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
new file mode 100644
index 0000000..3a5652b
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaPartition.java
@@ -0,0 +1,113 @@
+package com.datatorrent.contrib.kafka;
+
+import java.io.Serializable;
+
+public class KafkaPartition implements Serializable
+{
+  protected static final String DEFAULT_CLUSTERID = "com.datatorrent.contrib.kafka.defaultcluster";
+  
+  @SuppressWarnings("unused")
+  private KafkaPartition()
+  {
+  }
+
+  public KafkaPartition(String topic, int partitionId)
+  {
+    this(DEFAULT_CLUSTERID, topic, partitionId);
+  }
+
+  public KafkaPartition(String clusterId, String topic, int partitionId)
+  {
+    super();
+    this.clusterId = clusterId;
+    this.partitionId = partitionId;
+    this.topic = topic;
+  }
+
+  /**
+   * 
+   */
+  private static final long serialVersionUID = 7556802229202221546L;
+  
+
+  private String clusterId;
+  
+  private int partitionId;
+  
+  private String topic;
+
+  public String getClusterId()
+  {
+    return clusterId;
+  }
+
+  public void setClusterId(String clusterId)
+  {
+    this.clusterId = clusterId;
+  }
+
+  public int getPartitionId()
+  {
+    return partitionId;
+  }
+
+  public void setPartitionId(int partitionId)
+  {
+    this.partitionId = partitionId;
+  }
+
+  public String getTopic()
+  {
+    return topic;
+  }
+
+  public void setTopic(String topic)
+  {
+    this.topic = topic;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((clusterId == null) ? 0 : clusterId.hashCode());
+    result = prime * result + partitionId;
+    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj)
+  {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    KafkaPartition other = (KafkaPartition) obj;
+    if (clusterId == null) {
+      if (other.clusterId != null)
+        return false;
+    } else if (!clusterId.equals(other.clusterId))
+      return false;
+    if (partitionId != other.partitionId)
+      return false;
+    if (topic == null) {
+      if (other.topic != null)
+        return false;
+    } else if (!topic.equals(other.topic))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "KafkaPartition [clusterId=" + clusterId + ", partitionId=" + partitionId + ", topic=" + topic + "]";
+  }
+  
+  
+  
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
index d1ce932..f0e9ba4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/OffsetManager.java
@@ -18,7 +18,7 @@
 import java.util.Map;
 
 /**
- * An offset manager interface used by  {@link AbstractPartitionableKafkaInputOperator} to define the customized initial offsets and periodically update the current offsets of all the operators
+ * An offset manager interface used by  {@link AbstractKafkaInputOperator} to define the customized initial offsets and periodically update the current offsets of all the operators
  * <br>
  * <br>
  * Ex. you could write offset to hdfs and load it back when restart the application
@@ -35,16 +35,16 @@
    * <br>
    * The method is called at the first attempt of creating partitions and the return value is used as initial offset for simple consumer
    * 
-   * @return Map of Kafka partition id as key and offset as value
+   * @return Map of Kafka KafkaPartition as key and long offset as value
    */
-  public Map<Integer, Long> loadInitialOffsets();
+  public Map<KafkaPartition, Long> loadInitialOffsets();
 
 
   /**
    * @param offsetsOfPartitions offsets for specified partitions, it is reported by individual operator instances
    * <br>
-   * The method is called every {@link AbstractPartitionableKafkaInputOperator#getRepartitionCheckInterval()} to update the current offset
+   * The method is called every {@link AbstractKafkaInputOperator#getRepartitionCheckInterval()} to update the current offset
    */
-  public void updateOffsets(Map<Integer, Long> offsetsOfPartitions);
+  public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions);
 
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java
deleted file mode 100644
index cb7c98a..0000000
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/PartitionableKafkaSinglePortStringInputOperator.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.datatorrent.contrib.kafka;
-
-import com.datatorrent.api.annotation.OperatorAnnotation;
-import kafka.message.Message;
-
-import java.nio.ByteBuffer;
-
-/**
- * Kafka input adapter operator with a single output port, which consumes String data from the Kafka message bus.
- * <p></p>
- *
- * @displayName Partitionable Kafka Single Port String Input
- * @category Messaging
- * @tags input operator, string
- *
- * @since 0.9.0
- */
-@OperatorAnnotation(partitionable = true)
-public class PartitionableKafkaSinglePortStringInputOperator extends AbstractPartitionableKafkaSinglePortInputOperator<String>
-{
-  /**
-   * Implement abstract method of AbstractPartitionableKafkaSinglePortInputOperator
-   * Just parse the kafka message as a string
-   */
-  @Override
-  public String getTuple(Message message)
-  {
-    String data = "";
-    try {
-      ByteBuffer buffer = message.payload();
-      byte[] bytes = new byte[buffer.remaining()];
-      buffer.get(bytes);
-      data = new String(bytes);
-//      System.out.println(data);
-//      logger.debug("Consuming {}", data);
-    }
-    catch (Exception ex) {
-      return data;
-    }
-    return data;
-  }
-
-  @Override
-  protected AbstractPartitionableKafkaInputOperator cloneOperator()
-  {
-    return new PartitionableKafkaSinglePortStringInputOperator();
-  }
-
-
-}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 5186486..3ad9f37 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -15,22 +15,31 @@
  */
 package com.datatorrent.contrib.kafka;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
+
 import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
 import kafka.api.OffsetRequest;
@@ -42,37 +51,164 @@
 import kafka.message.MessageAndOffset;
 
 /**
- * Simple kafka consumer adaptor used by kafka input operator
- * Properties:<br>
+ * Simple kafka consumer adaptor used by kafka input operator Properties:<br>
  * <b>timeout</b>: Timeout for connection and ping <br>
  * <b>bufferSize</b>: buffer size of the consumer <br>
  * <b>clientId</b>: client id of the consumer <br>
  * <b>partitionIds</b>: partition id that the consumer want to consume <br>
- * <li> (-1): create #partition threads and consumers to read the topic from different partitions in parallel</li>
- * <br>
+ * <li>(-1): create #partition threads and consumers to read the topic from different partitions in parallel</li> <br>
  * <b>metadataRefreshInterval</b>: The interval that the monitor thread use to monitor the broker leadership change <br>
  * <b>metadataRetrievalRetry</b>: Maximum retry times for metadata retrieval failures<br>
  * default value is 3 <br>
- * -1: always retry <br>
+ * -1: unlimited retry <br>
  * <br>
  *
  * Load balance: <br>
- * <li>Every consumer only connects to leader broker for particular partition once it's created</li>
- * <li>Once leadership change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li>
- * <li>For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li>
- * <li>There is ONE separate thread to monitor the leadership for all the partitions of the topic at every #metadataRefreshInterval milliseconds</li>
- * <br>
+ * <li>The consumer create several data-consuming threads to consume the data from broker(s)</li> 
+ * <li>Each thread has only ONE kafka client connecting to ONE broker to consume data from for multiple partitions </li>
+ * <li>
+ * There is ONE separate thread to monitor the leadership for all the partitions of the topic at every
+ * #metadataRefreshInterval milliseconds</li>
+ * <li>Once leadership
+ * change detected(leader broker failure, or server-side reassignment), it switches to the new leader broker</li> <li>
+ * For server-side leadership change, see kafka-preferred-replica-election.sh and kafka-reassign-partitions.sh</li> <br>
  * <br>
  * Kafka broker failover: <br>
- * <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker </li>
- * <li>If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop consuming the partition</li>
- * <br>
+ * <li>Once broker failure is detected, it waits #metadataRefreshInterval to reconnect to the new leader broker</li> <li>
+ * If there are consecutive #metadataRetrievalRetry failures to retrieve the metadata for the topic. It will stop
+ * consuming the partition</li> <br>
  *
  * @since 0.9.0
  */
 public class SimpleKafkaConsumer extends KafkaConsumer
 {
 
+  /**
+   * The data-consuming thread that use one simple kafka client to connect to one broker which is the leader of the partition(s) that this consumer is interested 
+   */
+  static final class ConsumerThread implements Runnable
+  {
+    private final Broker broker;
+    private final String clientName;
+    // kafka simple consumer object
+    private SimpleConsumer ksc;
+    // The SimpleKafkaConsumer which holds this thread
+    private SimpleKafkaConsumer consumer;
+    // partitions consumed in this thread
+    private final Set<KafkaPartition> kpS;
+    @SuppressWarnings("rawtypes")
+    private Future threadItSelf;
+
+    private ConsumerThread(Broker broker, Set<KafkaPartition> kpl, SimpleKafkaConsumer consumer)
+    {
+      this.broker = broker;
+      this.clientName = consumer.getClientName(broker.host() + "_" + broker.port());
+      this.consumer = consumer;
+      this.kpS = Collections.newSetFromMap(new ConcurrentHashMap<KafkaPartition, Boolean>());
+      this.kpS.addAll(kpl);
+    }
+
+    @Override
+    public void run()
+    {
+
+      try {
+        logger.info("Connecting to broker {} [ timeout:{}, buffersize:{}, clientId: {}]", broker, consumer.timeout, consumer.bufferSize, clientName);
+        ksc = new SimpleConsumer(broker.host(), broker.port(), consumer.timeout, consumer.bufferSize, clientName);
+        // Initialize all start offsets for all kafka partitions read from this consumer
+        // read either from beginning of the broker or last offset committed by the operator
+        for (KafkaPartition kpForConsumer : kpS) {
+          logger.info("Start consuming data of topic {} ", kpForConsumer);
+          if (consumer.offsetTrack.get(kpForConsumer) != null) {
+            // start from recovery
+            // offsets.put(kpForConsumer, offsetTrack.get(kpForConsumer));
+            logger.info("Partition {} initial offset {}", kpForConsumer, consumer.offsetTrack.get(kpForConsumer));
+          } else {
+            long startOffsetReq = consumer.initialOffset.equalsIgnoreCase("earliest") ? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
+            logger.info("Partition {} initial offset {} {}", kpForConsumer.getPartitionId(), startOffsetReq, consumer.initialOffset);
+            consumer.offsetTrack.put(kpForConsumer, KafkaMetadataUtil.getLastOffset(ksc, consumer.topic, kpForConsumer.getPartitionId(), startOffsetReq, clientName));
+          }
+        }
+
+        // stop consuming only when the consumer container is stopped or the metadata can not be refreshed
+        while (consumer.isAlive && (consumer.metadataRefreshRetryLimit == -1 || consumer.retryCounter.get() < consumer.metadataRefreshRetryLimit)) {
+
+            if (kpS == null || kpS.isEmpty()) {
+              return;
+            }
+
+            FetchRequestBuilder frb = new FetchRequestBuilder().clientId(clientName);
+            // add all partition request in one Fretch request together
+            for (KafkaPartition kpForConsumer : kpS) {
+              frb.addFetch(consumer.topic, kpForConsumer.getPartitionId(), consumer.offsetTrack.get(kpForConsumer), consumer.bufferSize);
+            }
+
+            FetchRequest req = frb.build();
+            if (ksc == null) {
+              if (consumer.metadataRefreshInterval > 0) {
+                Thread.sleep(consumer.metadataRefreshInterval + 1000);
+              } else {
+                Thread.sleep(100);
+              }
+            }
+            FetchResponse fetchResponse = ksc.fetch(req);
+            for (Iterator<KafkaPartition> iterator = kpS.iterator(); iterator.hasNext();) {
+              KafkaPartition kafkaPartition = iterator.next();
+              if (fetchResponse.hasError() && fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()) != ErrorMapping.NoError()) {
+                // Kick off partition(s) which has error when fetch from this broker temporarily 
+                // Monitor will find out which broker it goes in monitor thread
+                logger.warn("Error when consuming topic {} from broker {} with error code {} ", kafkaPartition, broker,  fetchResponse.errorCode(consumer.topic, kafkaPartition.getPartitionId()));
+                iterator.remove();
+                consumer.partitionToBroker.remove(kafkaPartition);
+                consumer.stats.updatePartitionStats(kafkaPartition, -1, "");
+                continue;
+              } 
+              // If the fetchResponse either has no error or the no error for $kafkaPartition get the data
+              long offset = -1l;
+              for (MessageAndOffset msg : fetchResponse.messageSet(consumer.topic, kafkaPartition.getPartitionId())) {
+                offset = msg.nextOffset();
+                consumer.putMessage(kafkaPartition, msg.message(), msg.offset());
+              }
+              if (offset != -1) {
+                consumer.offsetTrack.put(kafkaPartition, offset);
+              }
+
+            }
+        }
+      } catch (Exception e){
+        logger.error("The consumer encounters an unrecoverable exception. Close the connection to broker {} \n Caused by {}", broker, e);
+      } finally {
+        if (ksc != null) {
+          ksc.close();
+        }
+        for (KafkaPartition kpForConsumer : kpS) {
+          // Update consumer that these partitions are currently stop being consumed because of some unrecoverable exception
+          consumer.partitionToBroker.remove(kpForConsumer);
+        }
+        
+        logger.info("Exit the consumer thread for broker {} ", broker);
+      }
+    }
+
+    public void addPartitions(Set<KafkaPartition> newKps)
+    {
+      // Add the partition(s) to this existing consumer thread they are assigned to this broker
+      kpS.addAll(newKps);
+
+    }
+
+    @SuppressWarnings("rawtypes")
+    public Future getThreadItSelf()
+    {
+      return threadItSelf;
+    }
+
+    public void setThreadItSelf(@SuppressWarnings("rawtypes") Future threadItSelf)
+    {
+      this.threadItSelf = threadItSelf;
+    }
+  }
+
   public SimpleKafkaConsumer()
   {
     super();
@@ -83,30 +219,32 @@
     this(topic, timeout, bufferSize, clientId, null);
   }
 
-  public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<Integer> partitionIds)
+  public SimpleKafkaConsumer(String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
   {
     super(topic);
     this.timeout = timeout;
     this.bufferSize = bufferSize;
     this.clientId = clientId;
-    this.partitionIds = partitionIds;
+    this.kps = partitionIds;
   }
 
-  public SimpleKafkaConsumer(Set<String> brokerList, String topic, int timeout, int bufferSize, String clientId, Set<Integer> partitionIds)
+  public SimpleKafkaConsumer(SetMultimap<String, String> zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
   {
-    super(brokerList, topic);
+    super(zks, topic);
     this.timeout = timeout;
     this.bufferSize = bufferSize;
     this.clientId = clientId;
-    this.partitionIds = partitionIds;
+    this.kps = partitionIds;
   }
 
   private static final Logger logger = LoggerFactory.getLogger(SimpleKafkaConsumer.class);
 
   /**
-   * Track thread for each partition, clean the resource if necessary
+   * Track consumers connected to each broker, topics and partitions hosted on same broker are consumed by same
+   * consumer. Clean the resource if necessary. Key is the kafka broker object.
    */
-  private final transient HashMap<Integer, AtomicReference<SimpleConsumer>> simpleConsumerThreads = new HashMap<Integer, AtomicReference<SimpleConsumer>>();
+  private final transient Map<Broker, ConsumerThread> simpleConsumerThreads = new HashMap<Broker, ConsumerThread>();
+
 
   private transient ExecutorService kafkaConsumerExecutor;
 
@@ -117,7 +255,6 @@
    */
   private final transient AtomicInteger retryCounter = new AtomicInteger(0);
 
-
   private int timeout = 10000;
 
   /**
@@ -132,49 +269,54 @@
   private String clientId = "Kafka_Simple_Client";
 
   /**
-   * interval in between reconnect if one kafka broker goes down in milliseconds
-   * if metadataRefreshInterval < 0 it will never refresh the metadata
-   * WARN: Turning off the refresh will disable failover to new broker
+   * Interval in between refresh the metadata change(broker change) in milliseconds. Metadata refresh guarantees to
+   * automatically reconnect to new broker that are new elected as broker host Disable metadata refresh by setting this
+   * to -1 WARN: Turning off the refresh will disable auto reconnect to new broker
    */
   private int metadataRefreshInterval = 30000;
 
-
   /**
-   * Maximum brokers' metadata refresh retry limit
-   * -1 means unlimited retry
+   * Maximum brokers' metadata refresh retry limit. -1 means unlimited retry
    */
   private int metadataRefreshRetryLimit = -1;
 
   /**
-   * You can setup your particular partitionID you want to consume with *simple
-   * kafka consumer*. Use this to maximize the distributed performance.
-   * By default it's -1 which means #partitionSize anonymous threads will be
-   * created to consume tuples from different partition
+   * You can setup your particular kafka partitions you want to consume for this consumer client. This can be used to
+   * share client and thread and maximize the overall performance. Null or empty value: consumer will create #
+   * threads&clients same as # brokers that host the all partitions of the topic Each thread consumes 1(+) partitions
+   * from 1 broker
    */
-  private Set<Integer> partitionIds = new HashSet<Integer>();
+  private Set<KafkaPartition> kps = new HashSet<KafkaPartition>();
 
-
+  // This map maintains mapping between kafka partition and it's leader broker in realtime monitored by a thread
+  private transient final ConcurrentHashMap<KafkaPartition, Broker> partitionToBroker = new ConcurrentHashMap<KafkaPartition, Broker>();
+  
   /**
-   * Track offset for each partition, so operator could start from the last serialized state
-   * Use ConcurrentHashMap to avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or synchronizedmap)
+   * Track offset for each partition, so operator could start from the last serialized state Use ConcurrentHashMap to
+   * avoid ConcurrentModificationException without blocking reads when updating in another thread(hashtable or
+   * synchronizedmap)
    */
-  private final ConcurrentHashMap<Integer, Long> offsetTrack = new ConcurrentHashMap<Integer, Long>();
+  private final ConcurrentHashMap<KafkaPartition, Long> offsetTrack = new ConcurrentHashMap<KafkaPartition, Long>();
 
   @Override
   public void create()
   {
     super.create();
-    List<PartitionMetadata> partitionMetaList = KafkaMetadataUtil.getPartitionsForTopic(brokerSet, topic);
-    boolean defaultSelect = (partitionIds == null) || (partitionIds.size() == 0);
+    Map<String, List<PartitionMetadata>> partitionMetas = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+    if (kps == null) {
+      kps = new HashSet<KafkaPartition>();
+    }
+    if (kps.size() != 0) {
+      return;
+    }
 
-    // if partition ids are null or not specified , find all the partitions for
-    // the specific topic else create the consumers of specified partition ids
-    for (PartitionMetadata part : partitionMetaList) {
-      final String clientName = getClientName(part.partitionId());
-      if (defaultSelect || partitionIds.contains(part.partitionId())) {
-        logger.info("Connecting to {}:{} [timeout:{}, buffersize:{}, clientId: {}]", part.leader().host(), part.leader().port(), timeout, bufferSize, clientName);
-        simpleConsumerThreads.put(part.partitionId(), new AtomicReference<SimpleConsumer>(new SimpleConsumer(part.leader().host(), part.leader().port(), timeout, bufferSize, clientName)));
-        stats.updatePartitionStats(part.partitionId(), part.leader().id(), part.leader().host() + ":" + part.leader().port());
+    // if partition ids are null or not specified , find all the partition metadata for
+    // the specific topic from broker
+    for (Entry<String, List<PartitionMetadata>> en : partitionMetas.entrySet()) {
+      String clusterId = en.getKey();
+      for (PartitionMetadata part : en.getValue()) {
+        KafkaPartition kp = new KafkaPartition(clusterId, topic, part.partitionId());
+        kps.add(kp);
       }
     }
 
@@ -186,143 +328,89 @@
     super.start();
 
     // thread to consume the kafka data
-    kafkaConsumerExecutor = Executors.newFixedThreadPool(simpleConsumerThreads.size(), new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
+    kafkaConsumerExecutor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("kafka-consumer-" + topic + "-%d").build());
+
+    if(metadataRefreshInterval <= 0) {
+      return;
+    }
 
     // background thread to monitor the kafka metadata change
-    metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
-    .setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
+    metadataRefreshExecutor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setNameFormat("kafka-consumer-monitor-" + topic + "-%d").setDaemon(true).build());
 
     // start one monitor thread to monitor the leader broker change and trigger some action
-    if (metadataRefreshInterval > 0) {
-      metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
+    final SimpleKafkaConsumer ref = this;
+    metadataRefreshExecutor.scheduleAtFixedRate(new Runnable() {
 
-        @Override
-        public void run()
-        {
-          if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
-            logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
-            List<PartitionMetadata> pms = KafkaMetadataUtil.getPartitionsForTopic(brokerSet, topic);
-            if (pms == null) {
-              // retrieve metadata fail
-              retryCounter.getAndAdd(1);
-            }
-            Set<String> newBrokerSet = new HashSet<String>();
-            for (PartitionMetadata pm : pms) {
-              for (Broker b : pm.isr()) {
-                newBrokerSet.add(b.host() + ":" + b.port());
-              }
-              int pid = pm.partitionId();
-              if (simpleConsumerThreads.containsKey(pid)) {
-                SimpleConsumer sc = simpleConsumerThreads.get(pid).get();
-                if (sc != null && sc.host().equals(pm.leader().host()) && sc.port() == pm.leader().port()) {
-                  continue;
-                }
-                // clean the consumer to reestablish the new connection
-                logger.info("Detected leader broker change, reconnecting to {}:{} for partition {}", pm.leader().host(), pm.leader().port(), pid);
-                cleanAndSet(pid, new SimpleConsumer(pm.leader().host(), pm.leader().port(), timeout, bufferSize, getClientName(pid)));
-                stats.updatePartitionStats(pid, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
-              }
-            }
-            brokerSet = newBrokerSet;
-            // reset to 0 if it reconnect to the broker which has current broker metadata
-            retryCounter.set(0);
+      private transient final SetMultimap<Broker, KafkaPartition> deltaPositive = HashMultimap.create();
+
+      @Override
+      public void run()
+      {
+        if (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
+          logger.debug("{}: Update metadata for topic {}", Thread.currentThread().getName(), topic);
+          Map<String, List<PartitionMetadata>> pms = KafkaMetadataUtil.getPartitionsForTopic(brokers, topic);
+          if (pms == null) {
+            // retrieve metadata fail add retry count and return
+            retryCounter.getAndAdd(1);
+            return;
           }
+
+          for (Entry<String, List<PartitionMetadata>> pmLEntry : pms.entrySet()) {
+            for (PartitionMetadata pm : pmLEntry.getValue()) {
+              KafkaPartition kp = new KafkaPartition(pmLEntry.getKey(), topic, pm.partitionId());
+              if (!kps.contains(kp)) {
+                // Out of this consumer's scope
+                continue;
+              }
+              Broker b = pm.leader();
+              Broker oldB = partitionToBroker.put(kp, b);
+              if(b.equals(oldB)) {
+                continue;
+              }
+              // add to positive
+              deltaPositive.put(b,kp);
+
+              // always update the latest connection information
+              stats.updatePartitionStats(kp, pm.leader().id(), pm.leader().host() + ":" + pm.leader().port());
+            }
+          }
+
+          // remove from map if the thread is done (partitions on this broker has all been reassigned to others(or temporarily not available) for
+          // example)
+          for (Iterator<Entry<Broker, ConsumerThread>> iterator = simpleConsumerThreads.entrySet().iterator(); iterator.hasNext();) {
+            Entry<Broker, ConsumerThread> item = iterator.next();
+            if (item.getValue().getThreadItSelf().isDone()) {
+              iterator.remove();
+            }
+          }
+
+          for (Broker b : deltaPositive.keySet()) {
+            if (!simpleConsumerThreads.containsKey(b)) {
+              // start thread for new broker
+              ConsumerThread ct = new ConsumerThread(b, deltaPositive.get(b), ref);
+              ct.setThreadItSelf(kafkaConsumerExecutor.submit(ct));
+              simpleConsumerThreads.put(b, ct);
+
+            } else {
+              simpleConsumerThreads.get(b).addPartitions(deltaPositive.get(b));
+            }
+          }
+
+          deltaPositive.clear();
+
+          // reset to 0 if it reconnect to the broker which has current broker metadata
+          retryCounter.set(0);
         }
-      }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
-    }
-
-
-
-    for (final Integer pid : simpleConsumerThreads.keySet()) {
-      //  initialize the stats snapshot for this partition
-      statsSnapShot.mark(pid, 0);
-      final String clientName = getClientName(pid);
-      kafkaConsumerExecutor.submit(new Runnable() {
-
-        AtomicReference<SimpleConsumer> csInThreadRef = simpleConsumerThreads.get(pid);
-
-        @Override
-        public void run()
-        {
-          // read either from beginning of the broker or last offset committed by the operator
-          long offset = 0L;
-          if(offsetTrack.get(pid)!=null){
-            //start from recovery
-            offset = offsetTrack.get(pid);
-            logger.debug("Partition {} offset {}", pid, offset);
-          } else {
-            long startOffsetReq = initialOffset.equalsIgnoreCase("earliest")? OffsetRequest.EarliestTime() : OffsetRequest.LatestTime();
-            logger.debug("Partition {} initial offset {} {}", pid, startOffsetReq, initialOffset);
-            offset = KafkaMetadataUtil.getLastOffset(csInThreadRef.get(), topic, pid, startOffsetReq, clientName);
-          }
-          try {
-            // stop consuming only when the consumer container is stopped or the metadata can not be refreshed
-            while (isAlive && (metadataRefreshRetryLimit == -1 || retryCounter.get() < metadataRefreshRetryLimit)) {
-
-              try {
-                FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, pid, offset, bufferSize).build();
-                SimpleConsumer sc = csInThreadRef.get();
-                if (sc == null) {
-                  if (metadataRefreshInterval > 0) {
-                    Thread.sleep(metadataRefreshInterval + 1000);
-                  } else {
-                    Thread.sleep(100);
-                  }
-                }
-                FetchResponse fetchResponse = csInThreadRef.get().fetch(req);
-
-                if (fetchResponse.hasError() && fetchResponse.errorCode(topic, pid) == ErrorMapping.OffsetOutOfRangeCode()) {
-                  // If OffsetOutOfRangeCode happen, it means all msgs have been consumed, clean the consumer and return
-                  cleanAndSet(pid, null);
-                  stats.updatePartitionStats(pid, -1, "");
-                  return;
-                } else if (fetchResponse.hasError()) {
-                  // If error happen, assume
-                  throw new Exception("Fetch message error, try to reconnect to broker");
-                }
-
-                for (MessageAndOffset msg : fetchResponse.messageSet(topic, pid)) {
-                  offset = msg.nextOffset();
-                  putMessage(pid, msg.message());
-                }
-                offsetTrack.put(pid, offset);
-
-              } catch (Exception e) {
-                logger.error("The consumer encounters an exception. Close the connection to partition {} ", pid, e);
-                cleanAndSet(pid, null);
-                stats.updatePartitionStats(pid, -1, "");
-              }
-            }
-          } finally {
-            // close the consumer
-            if(csInThreadRef.get()!=null){
-              csInThreadRef.get().close();
-            }
-            logger.info("Exit the consumer thread for partition {} ", pid);
-          }
-        }
-
-      });
-    }
-  }
-
-  private void cleanAndSet(Integer pid, SimpleConsumer newConsumer)
-  {
-    SimpleConsumer sc = simpleConsumerThreads.get(pid).getAndSet(newConsumer);
-    if (sc != null) {
-      logger.info("Close old connection to partition {}", pid);
-      sc.close();
-    }
+      }
+    }, 0, metadataRefreshInterval, TimeUnit.MILLISECONDS);
   }
 
   @Override
   public void close()
   {
     logger.info("Stop all consumer threads");
-    for(AtomicReference<SimpleConsumer> simConsumerRef : simpleConsumerThreads.values()){
-      if(simConsumerRef.get()!=null) {
-        simConsumerRef.get().close();
-      }
+    for (ConsumerThread ct : simpleConsumerThreads.values()) {
+      ct.getThreadItSelf().cancel(true);
     }
     simpleConsumerThreads.clear();
     metadataRefreshExecutor.shutdownNow();
@@ -380,64 +468,50 @@
   }
 
   @Override
-  protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds, Map<Integer, Long> startOffset)
-  {
-    // create different client for same partition
-    SimpleKafkaConsumer  skc = new SimpleKafkaConsumer(brokerSet, topic, timeout, bufferSize, clientId, partitionIds);
-    skc.setCacheSize(getCacheSize());
-    skc.setMetadataRefreshInterval(getMetadataRefreshInterval());
-    skc.setMetadataRefreshRetryLimit(getMetadataRefreshRetryLimit());
-    skc.initialOffset = this.initialOffset;
-    skc.resetOffset(startOffset);
-    skc.setCacheSize(getCacheSize());
-    return skc;
-  }
-
-  @Override
-  protected KafkaConsumer cloneConsumer(Set<Integer> partitionIds){
-    return cloneConsumer(partitionIds, null);
-  }
-
-  @Override
   protected void commitOffset()
   {
     // the simple consumer offset is kept in the offsetTrack
-    // It's better to do server registry for client in the future. Wait for kafka community come up with more sophisticated offset management
-    //TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
+    // It's better to do server registry for client in the future. Wait for kafka community come up with more
+    // sophisticated offset management
+    // TODO https://cwiki.apache.org/confluence/display/KAFKA/Inbuilt+Consumer+Offset+Management#
   }
 
-
-  private String getClientName(int pid){
-    return clientId + SIMPLE_CONSUMER_ID_SUFFIX + pid;
+  private String getClientName(String brokerName)
+  {
+    return clientId + SIMPLE_CONSUMER_ID_SUFFIX + brokerName;
   }
 
   @Override
-  protected Map<Integer, Long> getCurrentOffsets()
+  protected Map<KafkaPartition, Long> getCurrentOffsets()
   {
     return offsetTrack;
   }
 
-  private void resetOffset(Map<Integer, Long> overrideOffset){
-
-    if(overrideOffset == null){
+  public void resetOffset(Map<KafkaPartition, Long> overrideOffset)
+  {
+    if (overrideOffset == null) {
       return;
     }
     offsetTrack.clear();
     // set offset of the partitions assigned to this consumer
-    for (Integer pid: partitionIds) {
-      Long offsetForPar = overrideOffset.get(pid);
+    for (KafkaPartition kp : kps) {
+      Long offsetForPar = overrideOffset.get(kp);
       if (offsetForPar != null) {
-        offsetTrack.put(pid, offsetForPar);
+        offsetTrack.put(kp, offsetForPar);
       }
     }
   }
 
-  @Override
-  public KafkaMeterStats getConsumerStats()
+  public KafkaMeterStats getConsumerStats(Map<KafkaPartition, Long> offsetStats)
   {
-    stats.updateOffsets(offsetTrack);
+    stats.updateOffsets(offsetStats);
     return super.getConsumerStats();
   }
 
-
+  @Override
+  protected void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset)
+  {
+    this.kps = partitionIds;
+    resetOffset(startOffset);
+  }
 } // End of SimpleKafkaConsumer
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index 190af78..657676c 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -15,29 +15,41 @@
  */
 package com.datatorrent.contrib.kafka;
 
-import java.util.*;
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.BaseOperator;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.Operator;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.io.IdempotentStorageManager;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import java.io.File;
+import java.io.IOException;
+import java.util.Properties;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-
+import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 import org.slf4j.LoggerFactory;
 
-import com.datatorrent.api.*;
-import com.datatorrent.api.DAG.Locality;
-
 public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 {
   static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
-  static HashMap<String, List<?>> collections = new HashMap<String, List<?>>();
   static AtomicInteger tupleCount = new AtomicInteger();
   static CountDownLatch latch;
 
   /**
    * Test Operator to collect tuples from KafkaSingleInputStringOperator.
-   * 
+   *
    * @param <T>
    */
   public static class CollectorModule<T> extends BaseOperator
@@ -47,13 +59,10 @@
 
   public static class CollectorInputPort<T> extends DefaultInputPort<T>
   {
-    ArrayList<T> list;
-    final String id;
 
     public CollectorInputPort(String id, Operator module)
     {
       super();
-      this.id = id;
     }
 
     @Override
@@ -65,16 +74,13 @@
         }
         return;
       }
-      list.add(tuple);
       tupleCount.incrementAndGet();
     }
 
     @Override
     public void setConnected(boolean flag)
     {
-      if (flag) {
-        collections.put(id, list = new ArrayList<T>());
-      }
+      tupleCount.set(0);
     }
   }
 
@@ -83,20 +89,20 @@
    * Kafka, aka consumer). This module receives data from an outside test
    * generator through Kafka message bus and feed that data into Malhar
    * streaming platform.
-   * 
+   *
    * [Generate message and send that to Kafka message bus] ==> [Receive that
    * message through Kafka input adapter(i.e. consumer) and send using
    * emitTuples() interface on output port during onMessage call]
-   * 
-   * 
+   *
+   *
    * @throws Exception
    */
-  public void testKafkaInputOperator(int sleepTime, final int totalCount, KafkaConsumer consumer, boolean isValid) throws Exception
+  public void testKafkaInputOperator(int sleepTime, final int totalCount, KafkaConsumer consumer, boolean isValid, boolean idempotent) throws Exception
   {
     // initial the latch for this test
     latch = new CountDownLatch(1);
-    
-    
+
+
  // Start producer
     KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
     p.setSendCount(totalCount);
@@ -110,14 +116,15 @@
 
     // Create KafkaSinglePortStringInputOperator
     KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
+    node.setIdempotent(idempotent);
     consumer.setTopic(TEST_TOPIC);
-    if (isValid) {
-      Set<String> brokerSet = new HashSet<String>();
-      brokerSet.add("localhost:9092");
-      consumer.setBrokerSet(brokerSet);
-    }
+
     node.setConsumer(consumer);
-    
+
+    if (isValid) {
+      node.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+    }
+
     // Create Test tuple collector
     CollectorModule<String> collector = dag.addOperator("TestMessageCollector", new CollectorModule<String>());
 
@@ -129,51 +136,57 @@
     lc.setHeartbeatMonitoringEnabled(false);
 
     lc.runAsync();
-    
+
     // Wait 30s for consumer finish consuming all the messages
-    Assert.assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
-    
+    Assert.assertTrue("TIMEOUT: 30s ", latch.await(300000, TimeUnit.MILLISECONDS));
+
     // Check results
-    Assert.assertEquals("Collections size", 1, collections.size());
-    Assert.assertEquals("Tuple count", totalCount, collections.get(collector.inputPort.id).size());
-    logger.debug(String.format("Number of emitted tuples: %d", collections.get(collector.inputPort.id).size()));
-    
+    Assert.assertEquals("Tuple count", totalCount, tupleCount.intValue());
+    logger.debug(String.format("Number of emitted tuples: %d", tupleCount.intValue()));
+
     p.close();
     lc.shutdown();
   }
 
   @Test
-  public void testKafkaInputOperator_Highleverl() throws Exception
+  public void testKafkaInputOperator_Highlevel() throws Exception
   {
     int totalCount = 10000;
     Properties props = new Properties();
-    props.put("zookeeper.connect", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT);
     props.put("group.id", "group1");
-    props.put("consumer.id", "default_consumer");
     // This damn property waste me 2 days! It's a 0.8 new property. "smallest" means
     // reset the consumer to the beginning of the message that is not consumed yet
     // otherwise it wont get any of those the produced before!
     KafkaConsumer k = new HighlevelKafkaConsumer(props);
     k.setInitialOffset("earliest");
-    testKafkaInputOperator(1000, totalCount, k, true);
+    testKafkaInputOperator(1000, totalCount, k, true, false);
   }
-  
+
   @Test
   public void testKafkaInputOperator_Simple() throws Exception
   {
     int totalCount = 10000;
     KafkaConsumer k = new SimpleKafkaConsumer();
     k.setInitialOffset("earliest");
-    testKafkaInputOperator(1000, totalCount, k, true);
+    testKafkaInputOperator(1000, totalCount, k, true, false);
   }
-  
+
+  @Test
+  public void testKafkaInputOperator_Simple_Idempotent() throws Exception
+  {
+    int totalCount = 10000;
+    KafkaConsumer k = new SimpleKafkaConsumer();
+    k.setInitialOffset("earliest");
+    testKafkaInputOperator(1000, totalCount, k, true, true);
+  }
+
   @Test
   public void testKafkaInputOperator_Invalid() throws Exception
   {
     int totalCount = 10000;
     SimpleKafkaConsumer consumer = new SimpleKafkaConsumer();
     try{
-      testKafkaInputOperator(1000, totalCount,consumer, false);
+      testKafkaInputOperator(1000, totalCount,consumer, false, false);
     }catch(Exception e){
       // invalid host setup expect to fail here
       Assert.assertEquals("Error creating local cluster", e.getMessage());
@@ -184,7 +197,119 @@
   @After
   public void afterTest()
   {
-    collections.clear();
+    tupleCount.set(0);
     super.afterTest();
   }
+
+  public static class TestMeta extends TestWatcher
+  {
+    String baseDir;
+    String recoveryDir;
+    KafkaSinglePortStringInputOperator operator;
+    CollectorTestSink<Object> sink;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      String methodName = description.getMethodName();
+      String className = description.getClassName();
+      baseDir = "target/" + className + "/" + methodName;
+      recoveryDir = baseDir + "/" + "recovery";
+      try {
+        FileUtils.deleteDirectory(new File(recoveryDir));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testRecoveryAndIdempotency() throws Exception
+  {
+    int totalCount = 1500;
+
+    // initial the latch for this test
+    latch = new CountDownLatch(50);
+
+    // Start producer
+    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC);
+    p.setSendCount(totalCount);
+    new Thread(p).start();
+
+    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(Context.OperatorContext.SPIN_MILLIS, 500);
+
+    testMeta.context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributeMap);
+    testMeta.operator = new KafkaSinglePortStringInputOperator();
+    ((IdempotentStorageManager.FSIdempotentStorageManager) testMeta.operator.getIdempotentStorageManager()).setRecoveryPath(testMeta.recoveryDir);
+
+    KafkaConsumer consumer = new SimpleKafkaConsumer();
+    consumer.setTopic(TEST_TOPIC);
+    consumer.setInitialOffset("earliest");
+
+    testMeta.operator.setConsumer(consumer);
+    testMeta.operator.setZookeeper("localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+    testMeta.operator.setMaxTuplesPerWindow(500);
+    testMeta.sink = new CollectorTestSink<Object>();
+    testMeta.operator.outputPort.setSink(testMeta.sink);
+
+    testMeta.operator.setup(testMeta.context);
+    testMeta.operator.activate(testMeta.context);
+    latch.await(4000, TimeUnit.MILLISECONDS);
+    testMeta.operator.beginWindow(1);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+    testMeta.operator.beginWindow(2);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+
+    //failure and then re-deployment of operator
+    testMeta.sink.collectedTuples.clear();
+    testMeta.operator.teardown();
+    testMeta.operator.setup(testMeta.context);
+
+    Assert.assertEquals("largest recovery window", 2, testMeta.operator.getIdempotentStorageManager().getLargestRecoveryWindow());
+
+    testMeta.operator.beginWindow(1);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+    testMeta.operator.beginWindow(2);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+    latch.await(3000, TimeUnit.MILLISECONDS);
+    // Emiting data after all recovery windows are replayed
+    testMeta.operator.beginWindow(3);
+    testMeta.operator.emitTuples();
+    testMeta.operator.endWindow();
+
+    Assert.assertEquals("Total messages collected ", totalCount, testMeta.sink.collectedTuples.size());
+    testMeta.sink.collectedTuples.clear();
+  }
+
+  @Test
+  public void testZookeeper() throws Exception
+  {
+    // initial the latch for this test
+    latch = new CountDownLatch(50);
+
+    testMeta.operator = new KafkaSinglePortStringInputOperator();
+
+    KafkaConsumer consumer = new SimpleKafkaConsumer();
+    consumer.setTopic(TEST_TOPIC);
+
+    testMeta.operator.setConsumer(consumer);
+    testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181");
+    latch.await(500, TimeUnit.MILLISECONDS);
+
+    Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().getZookeeper().size());
+    Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().getZookeeper().get("cluster1").size());
+    Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster1").toString());
+    Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().getZookeeper().get("cluster2").size());
+    Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster2").toString());
+  }
+
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
index 2f7e485..2a96562 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOperatorTestBase.java
@@ -26,107 +26,127 @@
 import kafka.utils.Utils;
 
 import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.junit.After;
 import org.junit.Before;
 import org.slf4j.LoggerFactory;
 
 /**
- * This is a base class setup/clean Kafka testing environment for all the input/output test
- * If it's a multipartition test, this class creates 2 kafka partitions
+ * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
+ * test, this class creates 2 kafka partitions
  */
 public class KafkaOperatorTestBase
 {
 
   public static final String END_TUPLE = "END_TUPLE";
-  public static final int TEST_ZOOKEEPER_PORT =  2182;
-  public static final int TEST_KAFKA_BROKER1_PORT =  9092;
-  public static final int TEST_KAFKA_BROKER2_PORT =  9093;
+  public static final int[] TEST_ZOOKEEPER_PORT = new int[] { 2182, 2183 };
+  public static final int[][] TEST_KAFKA_BROKER_PORT = new int[][] { new int[] { 9092, 9093 }, new int[] { 9094, 9095 } };
   public static final String TEST_TOPIC = "test_topic";
 
   static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
   // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
-  private KafkaServerStartable kserver;
-  // it wont be initialized unless hasMultiPartition is set to true
-  private KafkaServerStartable kserver2;
-  private NIOServerCnxnFactory standaloneServerFactory;
+
+  // multiple brokers in multiple cluster
+  private final KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
+
+  // multiple cluster
+  private final ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
 
   public String baseDir = "/tmp";
 
-  private final String zkdir = "zookeeper-server-data";
-  private final String kafkadir1 = "kafka-server-data1";
-  private final String kafkadir2 = "kafka-server-data2";
+  private final String zkBaseDir = "zookeeper-server-data";
+  private final String kafkaBaseDir = "kafka-server-data";
+  private final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2" };
+  private final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1", "kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2" } };
   protected boolean hasMultiPartition = false;
+  protected boolean hasMultiCluster = false;
 
-
-  public void startZookeeper()
+  public void startZookeeper(final int clusterId)
   {
 
     try {
-      int clientPort = TEST_ZOOKEEPER_PORT;
+
+      int clientPort = TEST_ZOOKEEPER_PORT[clusterId];
       int numConnections = 10;
       int tickTime = 2000;
-      File dir = new File(baseDir, zkdir);
+      File dir = new File(baseDir, zkdir[clusterId]);
 
-      ZooKeeperServer kserver = new ZooKeeperServer(dir, dir, tickTime);
-      standaloneServerFactory = new NIOServerCnxnFactory();
-      standaloneServerFactory.configure(new InetSocketAddress(clientPort), numConnections);
-      standaloneServerFactory.startup(kserver); // start the zookeeper server.
-      //kserver.startup();
-    }
-    catch (InterruptedException ex) {
-      logger.debug(ex.getLocalizedMessage());
-    }
-    catch (IOException ex) {
+      TestZookeeperServer kserver = new TestZookeeperServer(dir, dir, tickTime);
+      zkFactory[clusterId] = new NIOServerCnxnFactory();
+      zkFactory[clusterId].configure(new InetSocketAddress(clientPort), numConnections);
+
+      zkFactory[clusterId].startup(kserver); // start the zookeeper server.
+      Thread.sleep(2000);
+      kserver.startup();
+    } catch (Exception ex) {
       logger.debug(ex.getLocalizedMessage());
     }
   }
 
   public void stopZookeeper()
   {
-    standaloneServerFactory.shutdown();
-    Utils.rm(new File(baseDir, zkdir));
+    for (ServerCnxnFactory zkf : zkFactory) {
+      if (zkf != null) {
+        zkf.shutdown();
+      }
+    }
+    Utils.rm(new File(baseDir, zkBaseDir));
+  }
+
+  public void startKafkaServer(int clusterid, int brokerid, int defaultPartitions)
+  {
+    Properties props = new Properties();
+    props.setProperty("broker.id", "" + brokerid);
+    props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
+    props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
+    props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
+    props.setProperty("default.replication.factor", "1");
+    // set this to 50000 to boost the performance so most test data are in memory before flush to disk
+    props.setProperty("log.flush.interval.messages", "50000");
+    if (hasMultiPartition) {
+      props.setProperty("num.partitions", "2");
+    } else {
+      props.setProperty("num.partitions", "1");
+    }
+
+    broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
+    broker[clusterid][brokerid].startup();
+
   }
 
   public void startKafkaServer()
   {
-    Properties props = new Properties();
-    props.setProperty("broker.id", "0");
-    props.setProperty("log.dirs", new File(baseDir, kafkadir1).toString());
-    props.setProperty("zookeeper.connect", "localhost:"+TEST_ZOOKEEPER_PORT);
-    props.setProperty("port", ""+TEST_KAFKA_BROKER1_PORT);
-    if(hasMultiPartition){
-      props.setProperty("num.partitions", "2");
-      props.setProperty("default.replication.factor", "2");
-    } else {
-      props.setProperty("num.partitions", "1");
+    boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition }, new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+    for (int i = 0; i < startable.length; i++) {
+      for (int j = 0; j < startable[i].length; j++) {
+        if (startable[i][j])
+          startKafkaServer(i, j, hasMultiPartition ? 2 : 1);
+      }
     }
-    // set this to 50000 to boost the performance so most test data are in memory before flush to disk
-    props.setProperty("log.flush.interval.messages", "50000");
-    kserver = new KafkaServerStartable(new KafkaConfig(props));
-    kserver.startup();
-    if(hasMultiPartition){
-      props.setProperty("broker.id", "1");
-      props.setProperty("log.dirs", new File(baseDir, kafkadir2).toString());
-      props.setProperty("port", "" + TEST_KAFKA_BROKER2_PORT);
-      props.setProperty("num.partitions", "2");
-      props.setProperty("default.replication.factor", "2");
-      kserver2 = new KafkaServerStartable(new KafkaConfig(props));
-      kserver2.startup();
+
+    // startup is asynch operation. wait 2 sec for server to startup
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
     }
 
   }
 
   public void stopKafkaServer()
   {
-    if(hasMultiPartition){
-      kserver2.shutdown();
-      kserver2.awaitShutdown();
-      Utils.rm(new File(baseDir, kafkadir2));
+    for (int i = 0; i < broker.length; i++) {
+      for (int j = 0; j < broker[i].length; j++) {
+        if (broker[i][j] != null) {
+          broker[i][j].shutdown();
+          broker[i][j].awaitShutdown();
+        }
+      }
     }
-    kserver.shutdown();
-    kserver.awaitShutdown();
-    Utils.rm(new File(baseDir, kafkadir1));
+    Utils.rm(new File(baseDir, kafkaBaseDir));
   }
 
   @Before
@@ -135,22 +155,32 @@
     try {
       startZookeeper();
       startKafkaServer();
-      createTopic(TEST_TOPIC);
-    }
-    catch (java.nio.channels.CancelledKeyException ex) {
+      createTopic(0, TEST_TOPIC);
+      if (hasMultiCluster) {
+        createTopic(1, TEST_TOPIC);
+      }
+    } catch (java.nio.channels.CancelledKeyException ex) {
       logger.debug("LSHIL {}", ex.getLocalizedMessage());
     }
   }
 
-  public void createTopic(String topicName)
+  public void startZookeeper()
+  {
+    startZookeeper(0);
+    if (hasMultiCluster) {
+      startZookeeper(1);
+    }
+  }
+
+  public void createTopic(int clusterid, String topicName)
   {
     String[] args = new String[9];
     args[0] = "--zookeeper";
-    args[1] = "localhost:" + TEST_ZOOKEEPER_PORT;
+    args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
     args[2] = "--replication-factor";
     args[3] = "1";
     args[4] = "--partitions";
-    if(hasMultiPartition){
+    if (hasMultiPartition) {
       args[5] = "2";
     } else {
       args[5] = "1";
@@ -160,10 +190,11 @@
     args[8] = "--create";
 
     TopicCommand.main(args);
-    //Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the topic is created
+    // Right now, there is no programmatic synchronized way to create the topic. have to wait 2 sec to make sure the
+    // topic is created
     // So the tests will not hit any bizarre failure
     try {
-      Thread.sleep(2000);
+      Thread.sleep(3000);
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
@@ -175,8 +206,7 @@
     try {
       stopKafkaServer();
       stopZookeeper();
-    }
-    catch (java.nio.channels.CancelledKeyException ex) {
+    } catch (Exception ex) {
       logger.debug("LSHIL {}", ex.getLocalizedMessage());
     }
   }
@@ -185,5 +215,54 @@
   {
     this.hasMultiPartition = hasMultiPartition;
   }
-}
 
+  public void setHasMultiCluster(boolean hasMultiCluster)
+  {
+    this.hasMultiCluster = hasMultiCluster;
+  }
+
+  public static class TestZookeeperServer extends ZooKeeperServer
+  {
+
+    public TestZookeeperServer()
+    {
+      super();
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
+    {
+      super(snapDir, logDir, tickTime);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
+    {
+      super(txnLogFactory, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder) throws IOException
+    {
+      super(txnLogFactory, tickTime, treeBuilder);
+      // TODO Auto-generated constructor stub
+    }
+
+    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
+    {
+      super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
+      // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    protected void registerJMX()
+    {
+    }
+
+    @Override
+    protected void unregisterJMX()
+    {
+    }
+
+  }
+}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
index 83734e9..dc06b7f 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaOutputOperatorTest.java
@@ -76,7 +76,6 @@
       dataGeneratorThread = new Thread("String Generator")
       {
         @Override
-        @SuppressWarnings("SleepWhileInLoop")
         public void run()
         {
           try {
@@ -119,7 +118,7 @@
    * @throws Exception
    */
   @Test
-  @SuppressWarnings({"SleepWhileInLoop", "empty-statement", "rawtypes"})
+  @SuppressWarnings({"rawtypes", "unchecked"})
   public void testKafkaOutputOperator() throws Exception
   {
     //initialize the latch to synchronize the threads
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
index ebb8cce..34c0199 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaPartitionableInputOperatorTest.java
@@ -22,22 +22,41 @@
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.DAG.Locality;
 
 /**
- * A test to verify the input operator will be automatically partitioned per kafka partition
- * This test is launching its own Kafka cluster.
+ * A test to verify the input operator will be automatically partitioned per kafka partition This test is launching its
+ * own Kafka cluster.
  */
+@RunWith(Parameterized.class)
 public class KafkaPartitionableInputOperatorTest extends KafkaOperatorTestBase
 {
 
-  public KafkaPartitionableInputOperatorTest()
+  private int totalBrokers = 0;
+
+  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}")
+  public static Collection<Boolean[]> testScenario()
+  {
+    return Arrays.asList(new Boolean[][] { { true, false }, // multi cluster with single partition
+        { true, true }, // multi cluster with multi partitions
+        { false, true }, // single cluster with multi partitions
+        { false, false } // single cluster with single partition
+        });
+  }
+
+  public KafkaPartitionableInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition)
   {
     // This class want to initialize several kafka brokers for multiple partitions
-    hasMultiPartition = true;
+    this.hasMultiCluster = hasMultiCluster;
+    this.hasMultiPartition = hasMultiPartition;
+    int cluster = 1 + (hasMultiCluster ? 1 : 0);
+    totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
+
   }
 
   static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaPartitionableInputOperatorTest.class);
@@ -89,14 +108,11 @@
   }
 
   /**
-   * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for
-   * Kafka, aka consumer). This module receives data from an outside test
-   * generator through Kafka message bus and feed that data into Malhar
-   * streaming platform.
+   * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
+   * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
    *
-   * [Generate message and send that to Kafka message bus] ==> [Receive that
-   * message through Kafka input adapter(i.e. consumer) and send using
-   * emitTuples() interface on output port during onMessage call]
+   * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
+   * consumer) and send using emitTuples() interface on output port during onMessage call]
    *
    *
    * @throws Exception
@@ -114,21 +130,21 @@
   {
     // Create template high-level consumer
     Properties props = new Properties();
-    props.put("zookeeper.connect", "localhost:2182");
     props.put("group.id", "main_group");
     HighlevelKafkaConsumer consumer = new HighlevelKafkaConsumer(props);
     testPartitionableInputOperator(consumer);
   }
 
-  public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception{
+  public void testPartitionableInputOperator(KafkaConsumer consumer) throws Exception
+  {
 
-    // Set to 2 because we want to make sure END_TUPLE from both 2 partitions are received
-    latch = new CountDownLatch(2);
+    // each broker should get a END_TUPLE message
+    latch = new CountDownLatch(totalBrokers);
 
     int totalCount = 10000;
 
     // Start producer
-    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, true);
+    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition, hasMultiCluster);
     p.setSendCount(totalCount);
     new Thread(p).start();
 
@@ -137,20 +153,18 @@
     DAG dag = lma.getDAG();
 
     // Create KafkaSinglePortStringInputOperator
-    PartitionableKafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", PartitionableKafkaSinglePortStringInputOperator.class);
+    KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
     node.setInitialPartitionCount(1);
 
-    //set topic
+    // set topic
     consumer.setTopic(TEST_TOPIC);
-    //set the brokerlist used to initialize the partition
-    Set<String> brokerSet =  new HashSet<String>();
-    brokerSet.add("localhost:9092");
-    brokerSet.add("localhost:9093");
-    consumer.setBrokerSet(brokerSet);
     consumer.setInitialOffset("earliest");
 
     node.setConsumer(consumer);
 
+    String clusterString = "cluster1::localhost:" + TEST_ZOOKEEPER_PORT[0] + (hasMultiCluster ? ";cluster2::localhost:" + TEST_ZOOKEEPER_PORT[1] : "");
+    node.setZookeeper(clusterString);
+
     // Create Test tuple collector
     CollectorModule<String> collector = dag.addOperator("TestMessageCollector", new CollectorModule<String>());
 
@@ -164,7 +178,7 @@
     lc.runAsync();
 
     // Wait 30s for consumer finish consuming all the messages
-    Assert.assertTrue("TIMEOUT: 30s ", latch.await(30000, TimeUnit.MILLISECONDS));
+    Assert.assertTrue("TIMEOUT: 40s ", latch.await(40000, TimeUnit.MILLISECONDS));
 
     // Check results
     Assert.assertEquals("Collections size", 1, collections.size());
@@ -173,6 +187,9 @@
 
     p.close();
     lc.shutdown();
+    // kafka has a bug shutdown connector you have to make sure kafka client resource has been cleaned before clean the broker 
+    Thread.sleep(5000);
   }
+  
 
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
index 810b653..c0337f3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTestProducer.java
@@ -29,11 +29,13 @@
 {
 //  private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
   private final kafka.javaapi.producer.Producer<String, String> producer;
+  private final kafka.javaapi.producer.Producer<String, String> producer1;
   private final String topic;
   private int sendCount = 20;
   // to generate a random int as a key for partition
   private final Random rand = new Random();
   private boolean hasPartition = false;
+  private boolean hasMultiCluster = false;
   private List<String> messages;
   
   private String producerType = "async";
@@ -52,17 +54,17 @@
     this.messages = messages;
   }
 
-  private ProducerConfig createProducerConfig()
+  private ProducerConfig createProducerConfig(int cid)
   {
     Properties props = new Properties();
     props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
     props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
 //    props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
     if(hasPartition){
-      props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT + ",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER2_PORT);
+      props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0] + ",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]);
       props.setProperty("partitioner.class", KafkaTestPartitioner.class.getCanonicalName());
     } else {
-      props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER1_PORT );
+      props.put("metadata.broker.list", "localhost:"+KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0] );
     }
     props.setProperty("topic.metadata.refresh.interval.ms", "20000");
 
@@ -76,13 +78,23 @@
     this(topic, false);
   }
 
-  public KafkaTestProducer(String topic, boolean hasPartition)
+  public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
   {
     // Use random partitioner. Don't need the key type. Just set it to Integer.
     // The message is of type String.
     this.topic = topic;
     this.hasPartition = hasPartition;
-    producer = new Producer<String, String>(createProducerConfig());
+    this.hasMultiCluster = hasMultiCluster;
+    producer = new Producer<String, String>(createProducerConfig(0));
+    if(hasMultiCluster){
+      producer1 = new Producer<String, String>(createProducerConfig(1));
+    } else {
+      producer1 = null;
+    }
+  }
+  
+  public KafkaTestProducer(String topic, boolean hasPartition) {
+    this(topic, hasPartition, false);
   }
 
   private void generateMessages()
@@ -92,15 +104,25 @@
     while (messageNo <= sendCount) {
       String messageStr = "Message_" + messageNo;
       int k = rand.nextInt(100);
-      producer.send(new KeyedMessage<String, String>(topic, "" + k, messageStr));
+      producer.send(new KeyedMessage<String, String>(topic, "" + k, "c1" + messageStr));
+      if(hasMultiCluster){
+        messageNo++;
+        producer1.send(new KeyedMessage<String, String>(topic, "" + k, "c2" + messageStr));
+      }
       messageNo++;
      // logger.debug(String.format("Producing %s", messageStr));
     }
     // produce the end tuple to let the test input operator know it's done produce messages
     producer.send(new KeyedMessage<String, String>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+    if(hasMultiCluster) {
+      producer1.send(new KeyedMessage<String, String>(topic, "" + 0, KafkaOperatorTestBase.END_TUPLE));
+    }
     if(hasPartition){
       // Send end_tuple to other partition if it exist
       producer.send(new KeyedMessage<String, String>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+      if(hasMultiCluster) {
+        producer1.send(new KeyedMessage<String, String>(topic, "" + 1, KafkaOperatorTestBase.END_TUPLE));
+      }
     }
   }
 
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index fa7b3a1..43e40d0 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -18,12 +18,10 @@
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
@@ -44,7 +42,9 @@
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.contrib.kafka.AbstractPartitionableKafkaInputOperator.PartitionStrategy;
+import com.datatorrent.contrib.kafka.AbstractKafkaInputOperator.PartitionStrategy;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
 
 public class OffsetManagerTest extends KafkaOperatorTestBase
 {
@@ -64,7 +64,7 @@
 
   public static class TestOffsetManager implements OffsetManager{
 
-    private final transient Map<Integer, Long> offsets = Collections.synchronizedMap(new HashMap<Integer, Long>());
+    private final transient Map<KafkaPartition, Long> offsets = Collections.synchronizedMap(new HashMap<KafkaPartition, Long>());
 
     private String filename = null;
 
@@ -78,15 +78,17 @@
     }
 
     @Override
-    public Map<Integer, Long> loadInitialOffsets()
+    public Map<KafkaPartition, Long> loadInitialOffsets()
     {
-      offsets.put(0, 10l);
-      offsets.put(1, 10l);
+      KafkaPartition kp0 = new KafkaPartition(TEST_TOPIC, 0);
+      KafkaPartition kp1 = new KafkaPartition(TEST_TOPIC, 1);
+      offsets.put(kp0, 10l);
+      offsets.put(kp1, 10l);
       return offsets;
     }
 
     @Override
-    public void updateOffsets(Map<Integer, Long> offsetsOfPartitions)
+    public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)
     {
 
       offsets.putAll(offsetsOfPartitions);
@@ -95,7 +97,7 @@
         Path tmpFile = new Path(filename + ".tmp");
         Path dataFile = new Path(filename);
         FSDataOutputStream out = fs.create(tmpFile, true);
-        for (Entry<Integer, Long> e : offsets.entrySet()) {
+        for (Entry<KafkaPartition, Long> e : offsets.entrySet()) {
           out.writeBytes(e.getKey() +", " + e.getValue() + "\n");
         }
         out.close();
@@ -112,8 +114,8 @@
       if (latch.getCount() == 1) {
         // when latch is 1, it means consumer has consumed all the messages
         int count = 0;
-        for (Entry<Integer, Long> entry : offsets.entrySet()) {
-          count += entry.getValue();
+        for (long entry : offsets.values()) {
+          count += entry;
         }
         if (count == totalCount + 2) {
           // wait until all offsets add up to totalCount messages + 2 control END_TUPLE
@@ -220,7 +222,7 @@
     DAG dag = lma.getDAG();
 
     // Create KafkaSinglePortStringInputOperator
-    PartitionableKafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", PartitionableKafkaSinglePortStringInputOperator.class);
+    KafkaSinglePortStringInputOperator node = dag.addOperator("Kafka message consumer", KafkaSinglePortStringInputOperator.class);
 
 
     TestOffsetManager tfm = new TestOffsetManager();
@@ -234,11 +236,10 @@
 
     //set topic
     consumer.setTopic(TEST_TOPIC);
-    //set the brokerlist used to initialize the partition
-    Set<String> brokerSet =  new HashSet<String>();
-    brokerSet.add("localhost:9092");
-    brokerSet.add("localhost:9093");
-    consumer.setBrokerSet(brokerSet);
+    //set the zookeeper list used to initialize the partition
+    SetMultimap<String, String> zookeeper = HashMultimap.create();
+    zookeeper.put(KafkaPartition.DEFAULT_CLUSTERID, "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
+    consumer.setZookeeper(zookeeper);
     consumer.setInitialOffset("earliest");
 
     node.setConsumer(consumer);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
index 509a12c..c8ad7f6 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/SimpleKakfaConsumerTest.java
@@ -1,16 +1,15 @@
 package com.datatorrent.contrib.kafka;
 
+import com.datatorrent.lib.util.TestUtils;
+import com.esotericsoftware.kryo.Kryo;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.HashMap;
-import java.util.HashSet;
-
 public class SimpleKakfaConsumerTest
 {
 
   @Test
-  public void cloneTest()
+  public void cloneTest() throws Exception
   {
     SimpleKafkaConsumer kc = new SimpleKafkaConsumer();
     int bufferSize = 1000;
@@ -22,7 +21,7 @@
     kc.setTopic("test_topic");
     kc.setClientId("test_clientid");
 
-    SimpleKafkaConsumer kcClone = (SimpleKafkaConsumer) kc.cloneConsumer(new HashSet(), new HashMap());
+    SimpleKafkaConsumer kcClone = TestUtils.clone(new Kryo(), kc);
     Assert.assertEquals("Buffer size is " + bufferSize, bufferSize, kcClone.getBufferSize());
     Assert.assertEquals("Cache size is " + cacheSize, cacheSize, kcClone.getCacheSize());
     Assert.assertEquals("Clint id is same", kc.getClientId(), kcClone.getClientId());
diff --git a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
index 87e2dcb..ad5d45d 100644
--- a/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
+++ b/demos/dimensions/src/main/java/com/datatorrent/demos/dimensions/generic/GenericDimensionsApplication.java
@@ -16,11 +16,13 @@
 package com.datatorrent.demos.dimensions.generic;
 
 import com.datatorrent.api.Context;
+
 import org.apache.hadoop.conf.Configuration;
 
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StreamingApplication;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.contrib.kafka.KafkaJsonEncoder;
 import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
 import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
 
@@ -110,6 +112,7 @@
     DimensionStoreOperator store = dag.addOperator("Store", DimensionStoreOperator.class);
     KafkaSinglePortStringInputOperator queries = dag.addOperator("Query", new KafkaSinglePortStringInputOperator());
     KafkaSinglePortOutputOperator<Object, Object> queryResult = dag.addOperator("QueryResult", new KafkaSinglePortOutputOperator<Object, Object>());
+    queryResult.getConfigProperties().put("serializer.class", KafkaJsonEncoder.class.getName());
 
     dag.setInputPortAttribute(converter.input, Context.PortContext.PARTITION_PARALLEL, true);
     dag.setInputPortAttribute(dimensions.data, Context.PortContext.PARTITION_PARALLEL, true);
diff --git a/demos/dimensions/src/main/resources/META-INF/properties.xml b/demos/dimensions/src/main/resources/META-INF/properties.xml
index 087e010..41fc5c8 100644
--- a/demos/dimensions/src/main/resources/META-INF/properties.xml
+++ b/demos/dimensions/src/main/resources/META-INF/properties.xml
@@ -70,7 +70,7 @@
 
   <!-- Ensure that Kafka connect address is set locally - deployment specific configuration -->
   <!--<property>-->
-    <!--<name>dt.operator.Query.brokerSet</name>-->
+    <!--<name>dt.operator.Query.zookeeper</name>-->
     <!--&lt;!&ndash; <value>hostname:port</value> &ndash;&gt;-->
   <!--</property>-->
   <!--<property>-->
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
index 4881df3..f51c74e 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/HDHTApplicationTest.java
@@ -49,8 +49,8 @@
     FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
     kafkaLauncher.startZookeeper();
     kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(kafkaQueryTopic);
-    kafkaLauncher.createTopic(kafkaQueryResultTopic);
+    kafkaLauncher.createTopic(0, kafkaQueryTopic);
+    kafkaLauncher.createTopic(0, kafkaQueryResultTopic);
   }
 
   @After
@@ -71,7 +71,7 @@
 
     conf.set("dt.operator.Store.fileStore.basePath", "target/HDSApplicationTestStore");
 
-    conf.set("dt.operator.Query.brokerSet", "localhost:9092");
+    conf.set("dt.operator.Query.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
     conf.set("dt.operator.Query.topic", kafkaQueryTopic);
     conf.set("dt.operator.QueryResult.topic", kafkaQueryResultTopic);
 
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
index b4d2ee9..353b267 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/ads/KafkaApplicationTest.java
@@ -45,7 +45,7 @@
     FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
     kafkaLauncher.startZookeeper();
     kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(kafkaTopic);
+    kafkaLauncher.createTopic(0, kafkaTopic);
   }
 
   @After
diff --git a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
index 41ff992..c5a6ffe 100644
--- a/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
+++ b/demos/dimensions/src/test/java/com/datatorrent/demos/dimensions/generic/GenericAppTest.java
@@ -50,8 +50,8 @@
     FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
     kafkaLauncher.startZookeeper();
     kafkaLauncher.startKafkaServer();
-    kafkaLauncher.createTopic(kafkaQueryTopic);
-    kafkaLauncher.createTopic(kafkaQueryResultTopic);
+    kafkaLauncher.createTopic(0, kafkaQueryTopic);
+    kafkaLauncher.createTopic(0, kafkaQueryResultTopic);
   }
 
   @After
@@ -65,11 +65,12 @@
   {
     LocalMode lma = LocalMode.newInstance();
     Configuration conf = new Configuration(false);
-    conf.addResource("META-INF/properties.xml");
+    conf.addResource("META-INF/"
+        + ".xml");
     conf.set("dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT", "1");
     conf.set("dt.operator.QueryResult.prop.configProperties(metadata.broker.list)", "localhost:9092");
     conf.set("dt.operator.DimensionsStore.fileStore.basePath", "target/HDSApplicationTestStore");
-    conf.set("dt.operator.Query.brokerSet", "localhost:9092");
+    conf.set("dt.operator.Query.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
     conf.set("dt.operator.Query.topic", kafkaQueryTopic);
     conf.set("dt.operator.QueryResult.topic", kafkaQueryResultTopic);
     conf.set("dt.operator.DimensionsComputation.attr.APPLICATION_WINDOW_COUNT", "2");