Merge branch 'mqchanges' of https://github.com/ishark/Malhar into mqchanges
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchbasePOJOSetOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchbasePOJOSetOperator.java
index d9e954e..075f4eb 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchbasePOJOSetOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchbase/CouchbasePOJOSetOperator.java
@@ -59,7 +59,7 @@
 
   /*
    * An ArrayList of Java expressions that will yield the field value from the POJO.
-   * Each expression corresponds to one column in the Cassandra table.
+   * Each expression corresponds to one column in the Couchbase table.
    */
   public ArrayList<String> getExpressions()
   {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchdb/AbstractCouchDBOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchdb/AbstractCouchDBOutputOperator.java
index 529188c..470874d 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchdb/AbstractCouchDBOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchdb/AbstractCouchDBOutputOperator.java
@@ -16,12 +16,13 @@
 package com.datatorrent.contrib.couchdb;
 
 import com.datatorrent.lib.db.AbstractStoreOutputOperator;
+import java.util.Map;
 
 /**
  * Generic base output adaptor which saves tuples in the CouchDb.&nbsp; Subclasses should provide implementation for getting Document Id. <br/>
  * <p>
  * An {@link AbstractStoreOutputOperator} saving tuples in the CouchDb.
- * Sub-classes provide the implementation of parsing document id from the tuple.
+ * Sub-classes provide the implementation of parsing document id from the tuple and converting tuple to a map.
  * @displayName Abstract CouchDB Output
  * @category Database
  * @tags output operator
@@ -49,4 +50,5 @@
    * @return document id.
    */
   public abstract String getDocumentId(T tuple);
+
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchdb/CouchDbPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchdb/CouchDbPOJOOutputOperator.java
new file mode 100644
index 0000000..04ce0f7
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchdb/CouchDbPOJOOutputOperator.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.couchdb;
+
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import javax.validation.constraints.NotNull;
+
+/**
+ * Implementation of {@link AbstractCouchDBOutputOperator} that saves a POJO in the couch database. <br/>
+ * <p>
+ * @displayName POJO Based CouchDb Output Operator
+ * @category Database
+ * @tags output operator
+ * @since 0.3.5
+ */
+public class CouchDbPOJOOutputOperator extends AbstractCouchDBOutputOperator<Object>
+{
+  private transient Getter<Object, String> getterDocId;
+
+  @NotNull
+  private String expressionForDocId;
+
+  /*
+   * An Expression to extract value of document Id from input POJO.
+   */
+  public String getExpressionForDocId()
+  {
+    return expressionForDocId;
+  }
+
+  public void setExpressionForDocId(String expressionForDocId)
+  {
+    this.expressionForDocId = expressionForDocId;
+  }
+
+  @Override
+  public String getDocumentId(Object tuple)
+  {
+    if (getterDocId == null) {
+      getterDocId = PojoUtils.createGetter(tuple.getClass(), expressionForDocId, String.class);
+    }
+    String docId = getterDocId.get(tuple);
+    return docId;
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/couchdb/MapBasedCouchDbOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/couchdb/MapBasedCouchDbOutputOperator.java
index e5c4a2f..b2054cd 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/couchdb/MapBasedCouchDbOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/couchdb/MapBasedCouchDbOutputOperator.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java
new file mode 100644
index 0000000..b04fd97
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/hive/FSPojoToHiveOperator.java
@@ -0,0 +1,278 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.hive;
+
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.sql.Date;
+
+/*
+ * An Implementation of AbstractFSRollingOutputOperator which takes any POJO as input, serializes the POJO as Hive delimiter separated values
+ * which are written in text files to hdfs, and are inserted into hive on committed window callback.This operator can handle outputting to multiple files when the output file depends on the tuple.
+ * @displayName: FSPojoToHiveOperator
+ */
+public class FSPojoToHiveOperator extends AbstractFSRollingOutputOperator<Object>
+{
+  private static final long serialVersionUID = 1L;
+  private ArrayList<String> hivePartitionColumns;
+  private ArrayList<String> hiveColumns;
+  private ArrayList<FIELD_TYPE> hiveColumnDataTypes;
+  private ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes;
+  private transient ArrayList<Object> getters;
+  private ArrayList<String> expressionsForHiveColumns;
+  private ArrayList<String> expressionsForHivePartitionColumns;
+
+  public ArrayList<String> getExpressionsForHivePartitionColumns()
+  {
+    return expressionsForHivePartitionColumns;
+  }
+
+  public void setExpressionsForHivePartitionColumns(ArrayList<String> expressionsForHivePartitionColumns)
+  {
+    this.expressionsForHivePartitionColumns = expressionsForHivePartitionColumns;
+  }
+
+
+  /*
+   * A list of Java expressions in which each expression yields the specific table column value and partition column value in hive table from the input POJO.
+   */
+  public ArrayList<String> getExpressionsForHiveColumns()
+  {
+    return expressionsForHiveColumns;
+  }
+
+  public void setExpressionsForHiveColumns(ArrayList<String> expressions)
+  {
+    this.expressionsForHiveColumns = expressions;
+  }
+
+  public FSPojoToHiveOperator()
+  {
+    super();
+    getters = new ArrayList<Object>();
+  }
+
+  @SuppressWarnings("unchecked")
+  private void getValue(Object tuple, int index, FIELD_TYPE type, StringBuilder value)
+  {
+    switch (type) {
+      case CHARACTER:
+        value.append(((GetterChar<Object>)getters.get(index)).get(tuple));
+        break;
+      case STRING:
+        value.append(((Getter<Object, String>)getters.get(index)).get(tuple));
+        break;
+      case BOOLEAN:
+        value.append(((GetterBoolean<Object>)getters.get(index)).get(tuple));
+        break;
+      case SHORT:
+        value.append(((GetterShort<Object>)getters.get(index)).get(tuple));
+        break;
+      case INTEGER:
+        value.append(((GetterInt<Object>)getters.get(index)).get(tuple));
+        break;
+      case LONG:
+        value.append(((GetterLong<Object>)getters.get(index)).get(tuple));
+        break;
+      case FLOAT:
+        value.append(((GetterFloat<Object>)getters.get(index)).get(tuple));
+        break;
+      case DOUBLE:
+        value.append(((GetterDouble<Object>)getters.get(index)).get(tuple));
+        break;
+      case DATE:
+        value.append(((Getter<Object, Date>)getters.get(index)).get(tuple));
+        break;
+      case TIMESTAMP:
+        value.append(((Getter<Object, Timestamp>)getters.get(index)).get(tuple));
+        break;
+      case OTHER:
+        value.append(((Getter<Object, Object>)getters.get(index)).get(tuple));
+        break;
+      default:
+        throw new RuntimeException("unsupported data type " + type);
+    }
+  }
+
+  public enum FIELD_TYPE
+  {
+    BOOLEAN, DOUBLE, INTEGER, FLOAT, LONG, SHORT, CHARACTER, STRING, DATE, TIMESTAMP, OTHER
+  };
+
+  /*
+   * Columns in Hive table.
+   */
+  public ArrayList<String> getHiveColumns()
+  {
+    return hiveColumns;
+  }
+
+  public void setHiveColumns(ArrayList<String> hiveColumns)
+  {
+    this.hiveColumns = hiveColumns;
+  }
+
+  /*
+   * Partition Columns in Hive table.Example: dt for date,ts for timestamp
+   */
+  public ArrayList<String> getHivePartitionColumns()
+  {
+    return hivePartitionColumns;
+  }
+
+  public void setHivePartitionColumns(ArrayList<String> hivePartitionColumns)
+  {
+    this.hivePartitionColumns = hivePartitionColumns;
+  }
+
+  /*
+   * Data Types of Hive table data columns.
+   * Example: If the Hive table has two columns of data type int and float,
+   * then hiveColumnsDataTypes = {INTEGER,FLOAT}.
+   * Particular Data Type can be chosen from the List of data types provided.
+   */
+  public ArrayList<FIELD_TYPE> getHiveColumnDataTypes()
+  {
+    return hiveColumnDataTypes;
+  }
+
+  public void setHiveColumnDataTypes(ArrayList<FIELD_TYPE> hiveColumnDataTypes)
+  {
+    this.hiveColumnDataTypes = hiveColumnDataTypes;
+  }
+
+  /*
+   * Data Types of Hive table Partition Columns.
+   * Example: If the Hive table has two columns of data type int and float and is partitioned by date of type String,
+   * then hivePartitionColumnDataTypes = {STRING}.
+   * Particular Data Type can be chosen from the List of data types provided.
+   */
+  public ArrayList<FIELD_TYPE> getHivePartitionColumnDataTypes()
+  {
+    return hivePartitionColumnDataTypes;
+  }
+
+  public void setHivePartitionColumnDataTypes(ArrayList<FIELD_TYPE> hivePartitionColumnDataTypes)
+  {
+    this.hivePartitionColumnDataTypes = hivePartitionColumnDataTypes;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public ArrayList<String> getHivePartition(Object tuple)
+  {
+    int sizeOfColumns = hiveColumns.size();
+    int sizeOfPartitionColumns = hivePartitionColumns.size();
+    int size = sizeOfColumns + sizeOfPartitionColumns;
+    ArrayList<String> hivePartitionColumnValues = new ArrayList<String>();
+    String partitionColumnValue;
+    for (int i = sizeOfColumns; i < size; i++) {
+      // FIELD_TYPE type = hiveColumnsDataTypes.get(i);
+      //partitionColumnValue = getValue(tuple, sizeOfColumns, type);
+      partitionColumnValue = ((Getter<Object, String>)getters.get(i)).get(tuple);
+      hivePartitionColumnValues.add(partitionColumnValue);
+    }
+    return hivePartitionColumnValues;
+  }
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+    super.processTuple(tuple);
+  }
+
+  @SuppressWarnings("unchecked")
+  public void processFirstTuple(Object tuple)
+  {
+    Class<?> fqcn = tuple.getClass();
+    createGetters(fqcn, hiveColumns.size(), expressionsForHiveColumns,hiveColumnDataTypes);
+    createGetters(fqcn, hivePartitionColumns.size(), expressionsForHivePartitionColumns,hivePartitionColumnDataTypes);
+  }
+
+  protected void createGetters(Class<?> fqcn, int size, ArrayList<String> expressions,ArrayList<FIELD_TYPE> columnDataTypes)
+  {
+    for (int i = 0; i < size; i++) {
+      FIELD_TYPE type = columnDataTypes.get(i);
+      final Object getter;
+      final String getterExpression = expressions.get(i);
+      switch (type) {
+        case CHARACTER:
+          getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+          break;
+        case STRING:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+          break;
+        case BOOLEAN:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case SHORT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case INTEGER:
+          getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+          break;
+        case LONG:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        case DATE:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Date.class);
+          break;
+        case TIMESTAMP:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Timestamp.class);
+          break;
+        default:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+      }
+
+      getters.add(getter);
+
+    }
+
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected byte[] getBytesForTuple(Object tuple)
+  {
+    int size = hiveColumns.size();
+    StringBuilder result = new StringBuilder();
+    for (int i = 0; i < size; i++) {
+      FIELD_TYPE type = hiveColumnDataTypes.get(i);
+      getValue(tuple, i, type, result);
+      result.append("\n");
+    }
+    return (result.toString()).getBytes();
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java b/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
index 8da7ee2..e82cecb 100755
--- a/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/hive/HiveOperator.java
@@ -131,6 +131,7 @@
   public void processTuple(FilePartitionMapping tuple)
   {
     String command = processHiveFile(tuple);
+    logger.debug("commands is {}",command);
     if (command != null) {
       Statement stmt;
       try {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
index b654b54..6984a96 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaInputOperator.java
@@ -387,7 +387,11 @@
     return consumer;
   }
 
-  // add topic as operator property
+  /**
+   * Set the Topic.
+   * @omitFromUI
+   */
+  @Deprecated
   public void setTopic(String topic)
   {
     this.consumer.setTopic(topic);
@@ -396,29 +400,12 @@
   /**
    * Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
    * The operator will discover the brokers that it needs to consume messages from.
+   * @omitFromUI
    */
+  @Deprecated
   public void setZookeeper(String zookeeperString)
   {
-    SetMultimap<String, String> theClusters = HashMultimap.create();
-    for (String zk : zookeeperString.split(";")) {
-      String[] parts = zk.split("::");
-      String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
-      String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
-      String portId = "";
-      for (int idx = hostNames.length - 1; idx >= 0; idx--) {
-        String[] zkParts = hostNames[idx].split(":");
-        if (zkParts.length == 2) {
-          portId = zkParts[1];
-        }
-        if (!portId.isEmpty() && portId != "") {
-          theClusters.put(clusterId, zkParts[0] + ":" + portId);
-        } else {
-          throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeperString + "\n"
-              + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
-        }
-      }
-    }
-    this.consumer.setZookeeper(theClusters);
+    this.consumer.setZookeeper(zookeeperString);
   }
 
   @Override
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
index d84a145..7e89e34 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java
@@ -115,11 +115,11 @@
   {
     super.start();
     //Share other properties among all connectors but set zookeepers respectively cause different cluster would use different zookeepers
-    for (String cluster : zookeeper.keySet()) {
+    for (String cluster : zookeeperMap.keySet()) {
       // create high level consumer for every cluster
       Properties config = new Properties();
       config.putAll(consumerConfig);
-      config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeper.get(cluster)));
+      config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeperMap.get(cluster)));
       // create consumer connector will start a daemon thread to monitor the metadata change 
       // we want to start this thread until the operator is activated 
       standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config)));
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
index fed982f..e0fdb21 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java
@@ -55,7 +55,7 @@
   protected final static String HIGHLEVEL_CONSUMER_ID_SUFFIX = "_stream_";
 
   protected final static String SIMPLE_CONSUMER_ID_SUFFIX = "_partition_";
-
+  private String zookeeper;
 
   public KafkaConsumer()
   {
@@ -67,10 +67,10 @@
     this.topic = topic;
   }
 
-  public KafkaConsumer(SetMultimap<String, String> zks, String topic)
+  public KafkaConsumer(String zks, String topic)
   {
     this.topic = topic;
-    this.zookeeper = zks;
+    setZookeeper(zks);
   }
 
   private int cacheSize = 1024;
@@ -91,7 +91,7 @@
    */
   @NotNull
   @Bind(JavaSerializer.class)
-  protected SetMultimap<String, String> zookeeper;
+  SetMultimap<String, String> zookeeperMap;
 
   protected transient SetMultimap<String, String> brokers;
 
@@ -124,14 +124,14 @@
     if(brokers!=null){
       return ;
     }
-    if(zookeeper!=null){
+    if(zookeeperMap !=null){
       brokers = HashMultimap.create();
-      for (String clusterId: zookeeper.keySet()) {
+      for (String clusterId: zookeeperMap.keySet()) {
         try {
-          brokers.putAll(clusterId, KafkaMetadataUtil.getBrokers(zookeeper.get(clusterId)));
+          brokers.putAll(clusterId, KafkaMetadataUtil.getBrokers(zookeeperMap.get(clusterId)));
         } catch (Exception e) {
           // let the user know where we tried to connect to
-          throw new RuntimeException("Error resolving brokers for cluster " + clusterId + " " + zookeeper.get(clusterId), e);
+          throw new RuntimeException("Error resolving brokers for cluster " + clusterId + " " + zookeeperMap.get(clusterId), e);
         }
       }
     }
@@ -175,6 +175,9 @@
     this.isAlive = isAlive;
   }
 
+  /**
+   * Set the Topic.
+   */
   public void setTopic(String topic)
   {
     this.topic = topic;
@@ -195,16 +198,6 @@
     return holdingBuffer.size();
   }
 
-  public void setZookeeper(SetMultimap<String, String> zks)
-  {
-    this.zookeeper = zks;
-  }
-
-  public SetMultimap<String, String> getZookeeper()
-  {
-    return zookeeper;
-  }
-
   public void setInitialOffset(String initialOffset)
   {
     this.initialOffset = initialOffset;
@@ -243,6 +236,22 @@
   }
 
   protected abstract void resetPartitionsAndOffset(Set<KafkaPartition> partitionIds, Map<KafkaPartition, Long> startOffset);
+
+  /**
+   * Set the ZooKeeper quorum of the Kafka cluster(s) you want to consume data from.
+   * The operator will discover the brokers that it needs to consume messages from.
+   */
+  public void setZookeeper(String zookeeper)
+  {
+    this.zookeeper = zookeeper;
+    this.zookeeperMap = parseZookeeperStr(zookeeper);
+  }
+
+  public String getZookeeper()
+  {
+    return zookeeper;
+  }
+
   /**
    * Counter class which gives the statistic value from the consumer
    */
@@ -507,4 +516,27 @@
     }
   }
 
+  private SetMultimap<String, String> parseZookeeperStr(String zookeeper)
+  {
+    SetMultimap<String, String> theClusters = HashMultimap.create();
+    for (String zk : zookeeper.split(";")) {
+      String[] parts = zk.split("::");
+      String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0];
+      String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(",");
+      String portId = "";
+      for (int idx = hostNames.length - 1; idx >= 0; idx--) {
+        String[] zkParts = hostNames[idx].split(":");
+        if (zkParts.length == 2) {
+          portId = zkParts[1];
+        }
+        if (!portId.isEmpty() && portId != "") {
+          theClusters.put(clusterId, zkParts[0] + ":" + portId);
+        } else {
+          throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeper + "\n"
+              + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2");
+        }
+      }
+    }
+    return theClusters;
+  }
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
index 720d7a1..2f5c412 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/SimpleKafkaConsumer.java
@@ -230,7 +230,7 @@
     this.kps = partitionIds;
   }
 
-  public SimpleKafkaConsumer(SetMultimap<String, String> zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
+  public SimpleKafkaConsumer(String zks, String topic, int timeout, int bufferSize, String clientId, Set<KafkaPartition> partitionIds)
   {
     super(zks, topic);
     this.timeout = timeout;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index 6d24496..b770884 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -234,33 +234,34 @@
   protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
   {
     final int size = columnDataTypes.size();
-    for (int i = 0; i < size; ) {
+    for (int i = 0; i < size; i++) {
       final int type = columnDataTypes.get(i);
       switch (type) {
         case (Types.CHAR):
           // TODO: verify that memsql driver handles char as int
-          statement.setInt(++i, ((GetterChar<Object>) getters.get(i)).get(tuple));
+          statement.setInt(i+1, ((GetterChar<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.VARCHAR):
-          statement.setString(++i, ((Getter<Object, String>) getters.get(i)).get(tuple));
+          statement.setString(i+1, ((Getter<Object, String>) getters.get(i)).get(tuple));
           break;
         case (Types.BOOLEAN):
-          statement.setBoolean(++i, ((GetterBoolean<Object>) getters.get(i)).get(tuple));
+        case (Types.TINYINT):
+          statement.setBoolean(i+1, ((GetterBoolean<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.SMALLINT):
-          statement.setShort(++i, ((GetterShort<Object>) getters.get(i)).get(tuple));
+          statement.setShort(i+1, ((GetterShort<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.INTEGER):
-          statement.setInt(++i, ((GetterInt<Object>) getters.get(i)).get(tuple));
+          statement.setInt(i+1, ((GetterInt<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.BIGINT):
-          statement.setLong (++i, ((GetterLong<Object>) getters.get(i)).get(tuple));
+          statement.setLong (i+1, ((GetterLong<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.FLOAT):
-          statement.setFloat(++i, ((GetterFloat<Object>) getters.get(i)).get(tuple));
+          statement.setFloat(i+1, ((GetterFloat<Object>) getters.get(i)).get(tuple));
           break;
         case (Types.DOUBLE):
-          statement.setDouble(++i, ((GetterDouble<Object>) getters.get(i)).get(tuple));
+          statement.setDouble(i+1, ((GetterDouble<Object>) getters.get(i)).get(tuple));
           break;
         default:
           /*
@@ -270,7 +271,7 @@
             Types.ARRAY
             Types.OTHER
            */
-          statement.setObject(++i, ((Getter<Object, Object>)getters.get(i)).get(tuple));
+          statement.setObject(i+1, ((Getter<Object, Object>)getters.get(i)).get(tuple));
           break;
       }
     }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/CloudSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/CloudSolrServerConnector.java
index f13500b..0822261 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/CloudSolrServerConnector.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/CloudSolrServerConnector.java
@@ -28,23 +28,29 @@
     solrServer = new CloudSolrServer(zookeeperHost, updateToLeader);
   }
 
-  // set this property in dt-site.xml
   public void setSolrZookeeperHost(String solrServerURL)
   {
     this.zookeeperHost = solrServerURL;
   }
 
+  /*
+   * The client endpoint of the zookeeper quorum containing the cloud state, in the form HOST:PORT
+   * Gets the zookeeper host
+   */
   public String getSolrZookeeperHost()
   {
     return zookeeperHost;
   }
 
-  // set this property in dt-site.xml
   public void setUpdateToLeader(boolean updateToLeader)
   {
     this.updateToLeader = updateToLeader;
   }
 
+  /*
+   * Sends updates only to leaders - defaults to true
+   * Gets boolean value of updateToLeader
+   */
   public boolean getUpdateToLeader()
   {
     return updateToLeader;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
index b55bd1a..a5891bf 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/ConcurrentUpdateSolrServerConnector.java
@@ -44,34 +44,43 @@
 
   }
 
-  // set this property in dt-site.xml
   public void setSolrServerURL(String solrServerURL)
   {
     this.solrServerURL = solrServerURL;
   }
 
+  /*
+   * The Solr server URL
+   * Gets the solr server URL
+   */
   public String getSolrServerURL()
   {
     return solrServerURL;
   }
 
-  // set this property in dt-site.xml
   public void setQueueSize(int queueSize)
   {
     this.queueSize = queueSize;
   }
 
+  /*
+   * The buffer size before the documents are sent to the server
+   * Gets the queue size of documents buffer
+   */
   public int getQueueSize()
   {
     return queueSize;
   }
 
-  // set this property in dt-site.xml
   public void setThreadCount(int threadCount)
   {
     this.threadCount = threadCount;
   }
 
+  /*
+   * The number of background threads used to empty the queue
+   * Gets the background threads count
+   */
   public int getThreadCount()
   {
     return threadCount;
@@ -82,12 +91,15 @@
     this.httpClient = httpClient;
   }
 
+  /*
+   * HttpClient instance 
+   * Gets the HTTP Client instance
+   */
   public HttpClient getHttpClient()
   {
     return httpClient;
   }
 
-  // set this property in dt-site.xml
   public void setStreamDeletes(boolean streamDeletes)
   {
     this.streamDeletes = streamDeletes;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/HttpSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/HttpSolrServerConnector.java
index d76448f..6786a72 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/HttpSolrServerConnector.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/HttpSolrServerConnector.java
@@ -23,12 +23,15 @@
     solrServer = new HttpSolrServer(solrServerURL);
   }
 
-  // set this property in dt-site.xml
   public void setSolrServerURL(String solrServerURL)
   {
     this.solrServerURL = solrServerURL;
   }
 
+  /*
+   * The URL of the Solr server.
+   * Gets the URL of solr server
+   */
   public String getSolrServerURL()
   {
     return solrServerURL;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/LBHttpSolrServerConnector.java b/contrib/src/main/java/com/datatorrent/contrib/solr/LBHttpSolrServerConnector.java
index a10b69c..82ae990 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/LBHttpSolrServerConnector.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/LBHttpSolrServerConnector.java
@@ -37,12 +37,15 @@
     }
   }
 
-  // set this property in dt-site.xml
   public void setSolrServerUrls(String[] solrServerUrls)
   {
     this.solrServerUrls = solrServerUrls;
   }
 
+  /*
+   * Urls of solr Server httpClient - http client instance responseParser - ResponseParser instance
+   * Gets the solr server urls
+   */
   public String[] getSolrServerUrls()
   {
     return solrServerUrls;
@@ -53,6 +56,10 @@
     this.httpClient = httpClient;
   }
 
+  /*
+   * HttpClient instance
+   * Gets the HTTP Client instance
+   */
   public HttpClient getHttpClient()
   {
     return httpClient;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/solr/SolrInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/solr/SolrInputOperator.java
index 0e96b88..9631dcc 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/solr/SolrInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/solr/SolrInputOperator.java
@@ -37,12 +37,15 @@
     return solrParams;
   }
 
+  /*
+   * Solr search query
+   * Gets the solr search query
+   */
   public String getSolrQuery()
   {
     return solrQuery;
   }
 
-  // set this property in dt-site.xml
   public void setSolrQuery(String solrQuery)
   {
     this.solrQuery = solrQuery;
diff --git a/contrib/src/test/java/com/datatorrent/contrib/couchdb/CouchDBOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/couchdb/CouchDBOutputOperatorTest.java
index 1b62793..093f08d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/couchdb/CouchDBOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/couchdb/CouchDBOutputOperatorTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2013 DataTorrent, Inc. ALL Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -69,6 +69,145 @@
     Assert.assertEquals("output-type", "map", docNode.get("output-type").getTextValue());
   }
 
+  @Test
+  public void testCouchDBPOJOOutputOperator()
+  {
+    String testDocumentId = "test2";
+    TestPOJO tuple = new TestPOJO();
+    tuple.setId(testDocumentId);
+    tuple.setName("TD2");
+    tuple.setType("test2");
+    tuple.setOutput_type("pojo");
+    Address address = new Address();
+    address.setCity("chandigarh");
+    address.setHousenumber(123);
+    tuple.setAddress(address);
+    CouchDbPOJOOutputOperator dbOutputOper = new CouchDbPOJOOutputOperator();
+    CouchDbStore store = new CouchDbStore();
+    store.setDbName(CouchDBTestHelper.TEST_DB);
+    dbOutputOper.setStore(store);
+    String expression = "getId()";
+    dbOutputOper.setExpressionForDocId(expression);
+    dbOutputOper.setup(new OperatorContextTestHelper.TestIdOperatorContext(1));
+    dbOutputOper.beginWindow(0);
+    dbOutputOper.input.process(tuple);
+    dbOutputOper.endWindow();
+
+    //Test if the document was persisted
+    JsonNode docNode = CouchDBTestHelper.fetchDocument(testDocumentId);
+    Assert.assertNotNull("Document saved ", docNode);
+    Assert.assertEquals("name of document ", "TD2", docNode.get("name").getTextValue());
+    Assert.assertEquals("type of document ", "test2", docNode.get("type").getTextValue());
+    Assert.assertEquals("output-type", "pojo", docNode.get("output_type").getTextValue());
+    Assert.assertEquals("Housenumber is ",123,docNode.get("address").get("housenumber").getIntValue());
+    Assert.assertEquals("City is ","chandigarh",docNode.get("address").get("city").getTextValue());
+
+  }
+  public class TestPOJO
+  {
+    private String _id;
+    private String output_type;
+    private String revision;
+    private Address address;
+    private  String name;
+    private String type;
+
+    public TestPOJO()
+    {
+
+    }
+
+    public Address getAddress()
+    {
+      return address;
+    }
+
+    public void setAddress(Address address)
+    {
+      this.address = address;
+    }
+
+    public String getRevision()
+    {
+      return revision;
+    }
+
+    public void setRevision(String rev)
+    {
+      this.revision = rev;
+    }
+
+    public String getOutput_type()
+    {
+      return output_type;
+    }
+
+    public void setOutput_type(String output_type)
+    {
+      this.output_type = output_type;
+    }
+
+    public String getId()
+    {
+      return _id;
+    }
+
+    public void setId(String id)
+    {
+      this._id = id;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+    public String getType()
+    {
+      return type;
+    }
+
+    public void setType(String type)
+    {
+      this.type = type;
+    }
+
+  }
+
+  private class Address
+  {
+    private int housenumber;
+
+    public int getHousenumber()
+    {
+      return housenumber;
+    }
+
+    public void setHousenumber(int housenumber)
+    {
+      this.housenumber = housenumber;
+    }
+
+    public String getCity()
+    {
+      return city;
+    }
+
+    public void setCity(String city)
+    {
+      this.city = city;
+    }
+    private String city;
+
+
+
+  }
+
   @BeforeClass
   public static void setup()
   {
diff --git a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java b/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
index d1c90e3..80e8ea3 100755
--- a/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/hive/HiveMockTest.java
@@ -33,6 +33,7 @@
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.Operator.ProcessingMode;
+import com.datatorrent.contrib.hive.FSPojoToHiveOperator.FIELD_TYPE;
 import com.datatorrent.lib.util.TestUtils.TestInfo;
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.serializers.FieldSerializer;
@@ -56,6 +57,7 @@
   public static final int BLAST_SIZE = 10;
   public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE;
   public static final String tablename = "temp";
+  public static final String tablepojo = "temppojo";
   public static final String tablemap = "tempmap";
   public static String delimiterMap = ":";
   public static final String HOST = "localhost";
@@ -137,7 +139,9 @@
       LOG.debug("tables are {}", res.getString(1));
     }
 
-    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 string) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+    stmt.execute("DROP TABLE " + tablename);
+
+    stmt.execute("CREATE TABLE IF NOT EXISTS " + tablename + " (col1 String) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
             + "STORED AS TEXTFILE ");
     /*ResultSet res = stmt.execute("CREATE TABLE IF NOT EXISTS temp4 (col1 map<string,int>,col2 map<string,int>,col3  map<string,int>,col4 map<String,timestamp>, col5 map<string,double>,col6 map<string,double>,col7 map<string,int>,col8 map<string,int>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','  \n"
      + "COLLECTION ITEMS TERMINATED BY '\n'  \n"
@@ -148,6 +152,25 @@
     hiveStore.disconnect();
   }
 
+  public static void hiveInitializePOJODatabase(HiveStore hiveStore) throws SQLException
+  {
+    hiveStore.connect();
+    Statement stmt = hiveStore.getConnection().createStatement();
+    // show tables
+    String sql = "show tables";
+
+    LOG.debug(sql);
+    ResultSet res = stmt.executeQuery(sql);
+    if (res.next()) {
+      LOG.debug("tables are {}", res.getString(1));
+    }
+    stmt.execute("DROP TABLE " + tablepojo);
+
+    stmt.execute("CREATE TABLE " + tablepojo + " (col1 int) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
+            + "STORED AS TEXTFILE ");
+    hiveStore.disconnect();
+  }
+
   public static void hiveInitializeMapDatabase(HiveStore hiveStore) throws SQLException
   {
     hiveStore.connect();
@@ -161,6 +184,8 @@
       LOG.debug(res.getString(1));
     }
 
+    stmt.execute("DROP TABLE " + tablemap);
+
     stmt.execute("CREATE TABLE IF NOT EXISTS " + tablemap + " (col1 map<string,int>) PARTITIONED BY(dt STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\n'  \n"
             + "MAP KEYS TERMINATED BY '" + delimiterMap + "' \n"
             + "STORED AS TEXTFILE ");
@@ -257,6 +282,113 @@
   }
 
   @Test
+  public void testInsertPOJO() throws Exception
+  {
+    HiveStore hiveStore = createStore(null);
+    hiveStore.setFilepath(testMeta.getDir());
+    ArrayList<String> hivePartitionColumns = new ArrayList<String>();
+    hivePartitionColumns.add("dt");
+    ArrayList<String> hiveColumns = new ArrayList<String>();
+    hiveColumns.add("col1");
+    hiveInitializePOJODatabase(createStore(null));
+    HiveOperator hiveOperator = new HiveOperator();
+    hiveOperator.setHivestore(hiveStore);
+    hiveOperator.setTablename(tablepojo);
+    hiveOperator.setHivePartitionColumns(hivePartitionColumns);
+
+    FSPojoToHiveOperator fsRolling = new FSPojoToHiveOperator();
+    fsRolling.setFilePath(testMeta.getDir());
+    fsRolling.setHiveColumns(hiveColumns);
+    ArrayList<FIELD_TYPE> fieldtypes = new ArrayList<FIELD_TYPE>();
+    ArrayList<FIELD_TYPE> partitiontypes = new ArrayList<FIELD_TYPE>();
+    fieldtypes.add(FIELD_TYPE.INTEGER);
+    partitiontypes.add(FIELD_TYPE.STRING);
+    fsRolling.setHiveColumnDataTypes(fieldtypes);
+    fsRolling.setHivePartitionColumnDataTypes(partitiontypes);
+    //ArrayList<FIELD_TYPE> partitionColumnType = new ArrayList<FIELD_TYPE>();
+    //partitionColumnType.add(FIELD_TYPE.STRING);
+    fsRolling.setHivePartitionColumns(hivePartitionColumns);
+    // fsRolling.setHivePartitionColumnsDataTypes(partitionColumnType);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("getId()");
+    ArrayList<String> expressionsPartitions = new ArrayList<String>();
+
+    expressionsPartitions.add("getDate()");
+    short permission = 511;
+    fsRolling.setFilePermission(permission);
+    fsRolling.setMaxLength(128);
+    fsRolling.setExpressionsForHiveColumns(expressions);
+    fsRolling.setExpressionsForHivePartitionColumns(expressionsPartitions);
+    AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
+    attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
+    attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+    fsRolling.setup(context);
+    hiveOperator.setup(context);
+    FilePartitionMapping mapping1 = new FilePartitionMapping();
+    FilePartitionMapping mapping2 = new FilePartitionMapping();
+    mapping1.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-11" + "/" + "0-transaction.out.part.0");
+    ArrayList<String> partitions1 = new ArrayList<String>();
+    partitions1.add("2014-12-11");
+    mapping1.setPartition(partitions1);
+    ArrayList<String> partitions2 = new ArrayList<String>();
+    partitions2.add("2014-12-12");
+    mapping2.setFilename(APP_ID + "/" + OPERATOR_ID + "/" + "2014-12-12" + "/" + "0-transaction.out.part.0");
+    mapping2.setPartition(partitions2);
+    for (int wid = 0, total = 0;
+            wid < NUM_WINDOWS;
+            wid++) {
+      fsRolling.beginWindow(wid);
+      for (int tupleCounter = 1;
+              tupleCounter < BLAST_SIZE && total < DATABASE_SIZE;
+              tupleCounter++, total++) {
+        InnerObj innerObj = new InnerObj();
+        innerObj.setId(tupleCounter);
+        innerObj.setDate("2014-12-1" + tupleCounter);
+        fsRolling.input.process(innerObj);
+      }
+      if (wid == 7) {
+        fsRolling.committed(wid - 1);
+        hiveOperator.processTuple(mapping1);
+        hiveOperator.processTuple(mapping2);
+      }
+
+      fsRolling.endWindow();
+    }
+
+    fsRolling.teardown();
+    hiveStore.connect();
+    client.execute("select * from " + tablepojo + " where dt='2014-12-11'");
+    List<String> recordsInDatePartition1 = client.fetchAll();
+
+    client.execute("select * from " + tablepojo + " where dt='2014-12-12'");
+    List<String> recordsInDatePartition2 = client.fetchAll();
+    client.execute("drop table " + tablepojo);
+    hiveStore.disconnect();
+
+    Assert.assertEquals(7, recordsInDatePartition1.size());
+    for (int i = 0; i < recordsInDatePartition1.size(); i++) {
+      LOG.debug("records in first date partition are {}", recordsInDatePartition1.get(i));
+      /*An array containing partition and data is returned as a string record, hence we need to upcast it to an object first
+       and then downcast to a string in order to use in Assert.*/
+      Object record = recordsInDatePartition1.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals(1, records[0]);
+      Assert.assertEquals("2014-12-11", records[1]);
+    }
+    Assert.assertEquals(7, recordsInDatePartition2.size());
+    for (int i = 0; i < recordsInDatePartition2.size(); i++) {
+      LOG.debug("records in second date partition are {}", recordsInDatePartition2.get(i));
+      Object record = recordsInDatePartition2.get(i);
+      Object[] records = (Object[])record;
+      Assert.assertEquals(2, records[0]);
+      Assert.assertEquals("2014-12-12", records[1]);
+    }
+  }
+
+  @Test
   public void testHiveInsertMapOperator() throws SQLException, TException
   {
     HiveStore hiveStore = createStore(null);
@@ -300,7 +432,7 @@
       for (int tupleCounter = 0;
               tupleCounter < BLAST_SIZE;
               tupleCounter++) {
-        map.put(2014-12-10 + "", 2014-12-10);
+        map.put(2014 - 12 - 10 + "", 2014 - 12 - 10);
         fsRolling.input.put(map);
         map.clear();
       }
@@ -458,6 +590,37 @@
 
   }
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(HiveMockTest.class);
+  public class InnerObj
+  {
+    public InnerObj()
+    {
+    }
+
+    private int id;
+    private String date;
+
+    public String getDate()
+    {
+      return date;
+    }
+
+    public void setDate(String date)
+    {
+      this.date = date;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(HiveMockTest.class);
 
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
index ea7cde8..12aab03 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java
@@ -323,11 +323,11 @@
     testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181");
     latch.await(500, TimeUnit.MILLISECONDS);
 
-    Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().getZookeeper().size());
-    Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().getZookeeper().get("cluster1").size());
-    Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster1").toString());
-    Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().getZookeeper().get("cluster2").size());
-    Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().getZookeeper().get("cluster2").toString());
+    Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().zookeeperMap.size());
+    Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().zookeeperMap.get("cluster1").size());
+    Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").toString());
+    Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().zookeeperMap.get("cluster2").size());
+    Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").toString());
   }
 
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
index 43e40d0..f740923 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/OffsetManagerTest.java
@@ -238,8 +238,8 @@
     consumer.setTopic(TEST_TOPIC);
     //set the zookeeper list used to initialize the partition
     SetMultimap<String, String> zookeeper = HashMultimap.create();
-    zookeeper.put(KafkaPartition.DEFAULT_CLUSTERID, "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
-    consumer.setZookeeper(zookeeper);
+    String zks = KafkaPartition.DEFAULT_CLUSTERID + "::localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0];
+    consumer.setZookeeper(zks);
     consumer.setInitialOffset("earliest");
 
     node.setConsumer(consumer);
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
new file mode 100644
index 0000000..4e1ed7b
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * JdbcPOJOOutputOperator class.</p>
+ * A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags output operator,transactional, POJO
+ * @since 2.1.0
+ */
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+{
+  @NotNull
+  private ArrayList<String> dataColumns;
+  //These are extracted from table metadata
+  private ArrayList<Integer> columnDataTypes;
+
+  /*
+   * An arraylist of data column names to be set in database.
+   * Gets column names.
+   */
+  public ArrayList<String> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<String> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+
+  @NotNull
+  private String tablename;
+
+  /*
+   * Gets the Tablename in database.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  /*
+   * An ArrayList of Java expressions that will yield the field value from the POJO.
+   * Each expression corresponds to one column in the database table.
+   */
+  public ArrayList<String> getExpressions()
+  {
+    return expressions;
+  }
+
+  public void setExpressions(ArrayList<String> expressions)
+  {
+    this.expressions = expressions;
+  }
+
+  @NotNull
+  private ArrayList<String> expressions;
+  private transient ArrayList<Object> getters;
+  private String insertStatement;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    StringBuilder columns = new StringBuilder("");
+    StringBuilder values = new StringBuilder("");
+    for (int i = 0; i < dataColumns.size(); i++) {
+      columns.append(dataColumns.get(i));
+      values.append("?");
+      if (i < dataColumns.size() - 1) {
+        columns.append(",");
+        values.append(",");
+      }
+    }
+    insertStatement = "INSERT INTO "
+            + tablename
+            + " (" + columns.toString() + ")"
+            + " VALUES (" + values.toString() + ")";
+    LOG.debug("insert statement is {}", insertStatement);
+    super.setup(context);
+    Connection conn = store.getConnection();
+    LOG.debug("Got Connection.");
+    try {
+      Statement st = conn.createStatement();
+      ResultSet rs = st.executeQuery("select * from " + tablename);
+
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+
+      int numberOfColumns = 0;
+
+      numberOfColumns = rsMetaData.getColumnCount();
+
+      LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+      for (int i = 1; i <= numberOfColumns; i++) {
+        // get the designated column's SQL type.
+        int type = rsMetaData.getColumnType(i);
+        LOG.debug("column name {}", rsMetaData.getColumnTypeName(i));
+        columnDataTypes.add(type);
+        LOG.debug("sql column type is " + type);
+      }
+    }
+    catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  public JdbcPOJOOutputOperator()
+  {
+    super();
+    columnDataTypes = new ArrayList<Integer>();
+    getters = new ArrayList<Object>();
+  }
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+    super.processTuple(tuple);
+  }
+
+  public void processFirstTuple(Object tuple)
+  {
+    final Class<?> fqcn = tuple.getClass();
+    final int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      final int type = columnDataTypes.get(i);
+      final String getterExpression = expressions.get(i);
+      final Object getter;
+      switch (type) {
+        case Types.CHAR:
+          getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+          break;
+        case Types.VARCHAR:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+          break;
+        case Types.BOOLEAN:
+        case Types.TINYINT:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case Types.SMALLINT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case Types.INTEGER:
+          getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+          break;
+        case Types.BIGINT:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case Types.FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case Types.DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        default:
+          /*
+           Types.DECIMAL
+           Types.DATE
+           Types.TIME
+           Types.ARRAY
+           Types.OTHER
+           */
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+          break;
+      }
+      getters.add(getter);
+    }
+
+  }
+
+  @Override
+  protected String getUpdateCommand()
+  {
+    LOG.debug("insertstatement is {}", insertStatement);
+    return insertStatement;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+  {
+    final int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      final int type = columnDataTypes.get(i);
+      switch (type) {
+        case (Types.CHAR):
+          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+          break;
+        case (Types.VARCHAR):
+          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+          break;
+        case (Types.BOOLEAN):
+        case (Types.TINYINT):
+          statement.setBoolean(i + 1, ((GetterBoolean<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.SMALLINT):
+          statement.setShort(i + 1, ((GetterShort<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.INTEGER):
+          statement.setInt(i + 1, ((GetterInt<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.BIGINT):
+          statement.setLong(i + 1, ((GetterLong<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.FLOAT):
+          statement.setFloat(i + 1, ((GetterFloat<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.DOUBLE):
+          statement.setDouble(i + 1, ((GetterDouble<Object>)getters.get(i)).get(tuple));
+          break;
+        default:
+          /*
+           Types.DECIMAL
+           Types.DATE
+           Types.TIME
+           Types.ARRAY
+           Types.OTHER
+           */
+          statement.setObject(i + 1, ((Getter<Object, Object>)getters.get(i)).get(tuple));
+          break;
+      }
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
+
+}
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 3e3bbff..851c59a 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -16,6 +16,7 @@
 import com.datatorrent.common.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import java.util.ArrayList;
 
 /**
  * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator}
@@ -26,6 +27,7 @@
   public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
 
   private static final String TABLE_NAME = "test_event_table";
+  private static final String TABLE_POJO_NAME = "test_pojo_event_table";
   private static String APP_ID = "JdbcOperatorTest";
   private static int OPERATOR_ID = 0;
 
@@ -39,6 +41,43 @@
     }
   }
 
+  public static class TestPOJOEvent
+  {
+    private int id;
+    private String name;
+
+    public TestPOJOEvent()
+    {
+    }
+
+    public TestPOJOEvent(int id, String name)
+    {
+      this.id = id;
+      this.name = name;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+  }
+
   @BeforeClass
   public static void setup()
   {
@@ -48,16 +87,18 @@
       Connection con = DriverManager.getConnection(URL);
       Statement stmt = con.createStatement();
 
-      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " +
-        JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " +
-        JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " +
-        JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " +
-        "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " +
-        ")";
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+              + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+              + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+              + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+              + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+              + ")";
       stmt.executeUpdate(createMetaTable);
 
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
       stmt.executeUpdate(createTable);
+      String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
+      stmt.executeUpdate(createPOJOTable);
     }
     catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -121,6 +162,32 @@
     }
   }
 
+  private static class TestPOJOOutputOperator extends JdbcPOJOOutputOperator
+  {
+    TestPOJOOutputOperator()
+    {
+      cleanTable();
+    }
+
+    public int getNumOfEventsInStore()
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      }
+      catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+  }
+
   private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent>
   {
 
@@ -195,6 +262,47 @@
     outputOperator.endWindow();
 
     Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
+    cleanTable();
+  }
+
+  @Test
+  public void testJdbcPOJOOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+    ArrayList<String> dataColumns = new ArrayList<String>();
+    dataColumns.add("id");
+    dataColumns.add("name");
+    outputOperator.setDataColumns(dataColumns);
+    outputOperator.setStore(transactionalStore);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("getId()");
+    expressions.add("getName()");
+    outputOperator.setExpressions(expressions);
+
+    outputOperator.setup(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event: events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
   }
 
   @Test