POJO implementaion for Cassandra,added a test case and minor changes in memsql.
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
index 5fd3470..0167694 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
@@ -18,7 +18,6 @@
 
 import com.datatorrent.api.*;
 import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.memsql.MemsqlInputOperator;
 import com.datatorrent.lib.stream.DevNull;
 import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java
new file mode 100644
index 0000000..27878f9
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.benchmark;
+
+import com.datatorrent.contrib.memsql.*;
+import com.datatorrent.api.Context.OperatorContext;
+import java.sql.Statement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import javax.validation.constraints.NotNull;
+
+public class MemsqlInputOperator extends AbstractMemsqlInputOperator<Object>
+{
+  private int blastSize = 1000;
+  private int currentRow = 1;
+  private int inputSize = 0;
+  @NotNull
+  private String tablename;
+  @NotNull
+  private String primaryKeyColumn;
+
+  /*
+   * Primary Key Column of table.
+   * Gets the primary key column of memsql table.
+   */
+  public String getPrimaryKeyColumn()
+  {
+    return primaryKeyColumn;
+  }
+
+  /*
+   * Primary Key Column of table.
+   * Sets the primary key column of memsql table.
+   */
+  public void setPrimaryKeyCol(String primaryKeyColumn)
+  {
+    this.primaryKeyColumn = primaryKeyColumn;
+  }
+
+   /*
+   * Name of the table in Memsql Database.
+   * Gets the Memsql Tablename.
+   * @return tablename
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  public MemsqlInputOperator()
+  {
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+
+    try {
+      Statement statement = store.getConnection().createStatement();
+      ResultSet resultSet = statement.executeQuery("select count(*) from " + tablename);
+      resultSet.next();
+      inputSize = resultSet.getInt(1);
+      statement.close();
+    }
+    catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  @Override
+  public Integer getTuple(ResultSet result)
+  {
+    Integer tuple = null;
+
+    try {
+      tuple = result.getInt(2);
+    }
+    catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    return tuple;
+  }
+
+  @Override
+  public void emitTuples()
+  {
+    if (currentRow >= inputSize) {
+      return;
+    }
+
+    super.emitTuples();
+  }
+
+  @Override
+  public String queryToRetrieveData()
+  {
+    if (currentRow > inputSize) {
+      return null;
+    }
+
+    int endRow = currentRow + blastSize;
+
+    if (endRow > inputSize + 1) {
+      endRow = inputSize + 1;
+    }
+
+    StringBuilder sb = new StringBuilder();
+    sb.append("select * from ");
+    sb.append(tablename);
+    sb.append(" where ");
+    sb.append(primaryKeyColumn);
+    sb.append(" >= ");
+    sb.append(currentRow);
+    sb.append(" and ");
+    sb.append(primaryKeyColumn);
+    sb.append(" < ");
+    sb.append(endRow);
+
+    currentRow = endRow;
+
+    return sb.toString();
+  }
+
+  public void setBlastSize(int blastSize)
+  {
+    this.blastSize = blastSize;
+  }
+
+  /*
+   * Records are read in batches of this size.
+   * Gets the batch size.
+   * @return batchsize
+   */
+  public int getBlastSize()
+  {
+    return blastSize;
+  }
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
index 5b4864f..a51f5a8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
@@ -47,7 +47,6 @@
  * @param <T>type of tuple</T>
  * @since 1.0.2
  */
-@Deprecated
 public abstract class AbstractCassandraTransactionableOutputOperatorPS<T> extends AbstractCassandraTransactionableOutputOperator<T>{
 
   private transient PreparedStatement updateCommand;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
new file mode 100644
index 0000000..b9a3f69
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterObject;
+import com.datatorrent.lib.util.PojoUtils.GetterString;
+import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * CassandraOutputOperator class.</p>
+ * A Generic implementation of AbstractCassandraTransactionableOutputOperatorPS which takes in any POJO.
+ *
+ * @since 1.0.3
+ */
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+{
+  private ArrayList<String> columns;
+  private ArrayList<DataType> columnDataTypes;
+  private ArrayList<String> expressions;
+  private transient ArrayList<Object> getters;
+
+  /*
+   * An ArrayList of Java expressions that will yield the field value from the POJO.
+   * Each expression corresponds to one column in the Cassandra table.
+   */
+  public ArrayList<String> getExpressions()
+  {
+    return expressions;
+  }
+
+  public void setExpressions(ArrayList<String> expressions)
+  {
+    this.expressions = expressions;
+  }
+
+  public ArrayList<String> getColumns()
+  {
+    return columns;
+  }
+
+  public void setColumns(ArrayList<String> columns)
+  {
+    this.columns = columns;
+  }
+
+  @NotNull
+  private String tablename;
+
+
+  /*
+   * Tablename in cassandra.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  public CassandraOutputOperator()
+  {
+    super();
+    columnDataTypes = new ArrayList<DataType>();
+    getters = new ArrayList<Object>();
+  }
+
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+    super.processTuple(tuple);
+  }
+
+  public void processFirstTuple(Object tuple)
+  {
+    Class<?> fqcn = tuple.getClass();
+    int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      DataType type = columnDataTypes.get(i);
+      String getterExpression = expressions.get(i);
+      if (type.equals(DataType.Name.ASCII) || type.equals(DataType.Name.TEXT) || type.equals(DataType.Name.VARCHAR)) {
+        GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
+        getters.add(getVarchar);
+      }
+      else if (type.equals(DataType.Name.BOOLEAN)) {
+        GetterBoolean getBoolean = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+        getters.add(getBoolean);
+      }
+      else if (type.equals(DataType.Name.INT)) {
+        GetterInt getInt = PojoUtils.createGetterInt(fqcn, getterExpression);
+        getters.add(getInt);
+      }
+      else if (type.equals(DataType.Name.BIGINT) || type.equals(DataType.Name.COUNTER)) {
+        GetterLong getLong = PojoUtils.createExpressionGetterLong(fqcn, getterExpression);
+        getters.add(getLong);
+      }
+      else if (type.equals(DataType.Name.FLOAT)) {
+        GetterFloat getFloat = PojoUtils.createGetterFloat(fqcn, getterExpression);
+        getters.add(getFloat);
+      }
+      else if (type.equals(DataType.Name.DOUBLE)) {
+        GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
+        getters.add(getDouble);
+      }
+      else if (type.equals(DataType.Name.TIMESTAMP)) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add(getObject);
+      }
+      else if (type.equals(DataType.Name.CUSTOM) || type.equals(DataType.Name.LIST) || type.equals(DataType.Name.MAP) || type.equals(DataType.Name.SET) ) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add(getObject);
+      }
+
+    }
+
+  }
+
+  @Override
+  protected PreparedStatement getUpdateCommand()
+  {
+      com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace +"."+tablename);
+
+      ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+
+      int numberOfColumns = 0;
+
+      numberOfColumns = rsMetaData.size();
+
+      LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+      for (int i = 0; i < numberOfColumns; i++) {
+        // get the designated column's data type.
+        DataType type = rsMetaData.getType(i);
+        columnDataTypes.add(type);
+        LOG.debug("column type is " + rsMetaData.getType(i));
+        LOG.debug("sql column type is " + type);
+      }
+
+    StringBuilder queryfields = new StringBuilder("");
+    StringBuilder values = new StringBuilder("");
+    for (int i = 0; i < columns.size(); i++) {
+      if (queryfields.length()==0) {
+        queryfields.append(columns.get(i));
+        values.append("?");
+      }
+      else {
+        queryfields.append(",").append(columns.get(i));
+        values.append(",").append(columns.get(i));
+      }
+    }
+    LOG.debug("queryfields are", queryfields.toString());
+    LOG.debug("values are ",values.toString());
+    String statement
+            = "INSERT INTO " + store.keyspace + "."
+            + tablename
+            + "(" + queryfields.toString() + ")"
+            + "VALUES (" + values.toString() + ");";
+    LOG.debug("statement is", statement);
+    return store.getSession().prepare(statement);
+  }
+
+  @Override
+  protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException
+  {
+    BoundStatement boundStmnt = new BoundStatement(updateCommand);
+    int size = columnDataTypes.size();
+    Object getter;
+    for (int i = 0; i < size; i++) {
+      DataType type = columnDataTypes.get(i);
+       switch (type.getName()) {
+        case ASCII:
+          getter = ((GetterString)getters.get(i)).get(tuple);
+          break;
+        case VARCHAR:
+          getter = ((GetterString)getters.get(i)).get(tuple);
+          break;
+        case TEXT:
+          getter = ((GetterString)getters.get(i)).get(tuple);
+          break;
+        case BOOLEAN:
+          getter = ((GetterBoolean)getters.get(i)).get(tuple);
+          break;
+        case INT:
+          getter = ((GetterInt)getters.get(i)).get(tuple);
+          break;
+        case BIGINT:
+          getter = ((GetterLong)getters.get(i)).get(tuple);
+          break;
+        case COUNTER:
+          getter = ((GetterLong)getters.get(i)).get(tuple);
+          break;
+        case FLOAT:
+          getter = ((GetterFloat)getters.get(i)).get(tuple);
+          break;
+        case DOUBLE:
+          getter = ((GetterDouble)getters.get(i)).get(tuple);
+          break;
+        case TIMESTAMP:
+          getter = (Date)((GetterObject)getters.get(i)).get(tuple);
+          break;
+        case CUSTOM:
+          getter = ((GetterObject)getters.get(i)).get(tuple);
+          break;
+        default:
+          getter = ((GetterObject)getters.get(i)).get(tuple);
+          break;
+      }
+      boundStmnt.bind(i + 1, getter);
+
+    }
+    return boundStmnt;
+  }
+
+  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
index ee47e35..d121181 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
@@ -29,7 +29,7 @@
 import com.datatorrent.lib.db.Connectable;
 
 /**
- * A {@link Connectable} that uses cassandra to connect to stores and implements Connectable interface. 
+ * A {@link Connectable} that uses cassandra to connect to stores and implements Connectable interface.
  * <p>
  * @displayName Cassandra Store
  * @category Store
@@ -45,7 +45,19 @@
   private String node;
   protected transient Cluster cluster = null;
   protected transient Session session = null;
-  protected String keyspace=null;
+
+  @NotNull
+  protected String keyspace;
+
+  /*
+   * The Cassandra keyspace is a namespace that defines how data is replicated on nodes.
+   * Typically, a cluster has one keyspace per application. Replication is controlled on a per-keyspace basis, so data that has different replication requirements typically resides in different keyspaces.
+   * Keyspaces are not designed to be used as a significant map layer within the data model. Keyspaces are designed to control data replication for a set of tables.
+   */
+  public String getKeyspace()
+  {
+    return keyspace;
+  }
 
   /**
    * Sets the keyspace.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index c8795af..46f3bbf 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -49,12 +49,10 @@
   private ArrayList<Integer> columnDataTypes;
   private transient boolean isFirstTuple;
   private transient ArrayList<Object> getters;
-  private String primaryKey;
 
   /*
    * An ArrayList of Java expressions that will yield the field value from the POJO.
    * Each expression corresponds to one column in the memsql table.
-   * Example:
    */
   public ArrayList<String> getExpression()
   {
@@ -108,22 +106,6 @@
     this.tablename = tablename;
   }
 
-  /*
-   * Gets the private key of the memsql table.
-   */
-  public String getPrimaryKey()
-  {
-    return primaryKey;
-  }
-
-  /*
-   * Sets the private key in the memsql table.
-   */
-  public void setPrivateKey(String primaryKey)
-  {
-    this.primaryKey = primaryKey;
-  }
-
   @Override
   public void setup(OperatorContext context)
   {
@@ -172,6 +154,7 @@
 
   public MemsqlOutputOperator()
   {
+    super();
     columnDataTypes = new ArrayList<Integer>();
     getters = new ArrayList<Object>();
   }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
index eab75ee..5c281a2 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
@@ -245,21 +245,37 @@
     String name;
     FIELD_TYPE type;
 
+    /**
+     * Gets the name of the Field.
+     * @return name
+     */
     public String getName()
     {
       return name;
     }
 
+    /**
+     * Sets the name of the Field.
+     * @param name
+     */
     public void setName(String name)
     {
       this.name = name;
     }
 
+    /**
+     * Gets the type of the Field.
+     * @return type
+     */
     public FIELD_TYPE getType()
     {
       return type;
     }
 
+    /**
+     * Sets the type of the Field.
+     * @return type
+     */
     public void setType(String type)
     {
       this.type = FIELD_TYPE.valueOf(type);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 34728b4..e75a0ab 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -23,11 +23,11 @@
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
 import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import javax.annotation.Nonnull;
 import java.util.List;
 
 /**
@@ -35,8 +35,8 @@
  */
 public class CassandraOperatorTest
 {
-  public static final String NODE = "127.0.0.1";
-  public static final String KEYSPACE = "test";
+  public static final String NODE = "localhost";
+  public static final String KEYSPACE = "demo";
 
   private static final String TABLE_NAME = "test_event_table";
   private static String APP_ID = "CassandraOperatorTest";
@@ -92,34 +92,8 @@
     }
   }
 
-  private static class TestOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<TestEvent>
+  private static class TestOutputOperator extends CassandraOutputOperator
   {
-    private static final String INSERT_STMT = "INSERT INTO " + KEYSPACE+"." +TABLE_NAME + " (ID) VALUES (?);";
-
-    TestOutputOperator()
-    {
-      cleanTable();
-    }
-
-    @Nonnull
-    @Override
-    protected PreparedStatement getUpdateCommand()
-    {
-      try {
-        return store.getSession().prepare(INSERT_STMT);
-      }
-      catch (DriverException e) {
-        throw new RuntimeException("preparing", e);
-      }
-    }
-
-    @Override
-    protected Statement setStatementParameters(PreparedStatement statement, TestEvent tuple) throws DriverException
-    {
-      BoundStatement boundStatement = new BoundStatement(statement);
-      Statement stmnt = boundStatement.bind(tuple.id);
-      return stmnt;
-    }
 
     public long getNumOfEventsInStore()
     {
@@ -206,6 +180,13 @@
 
     TestOutputOperator outputOperator = new TestOutputOperator();
 
+    outputOperator.setTablename(TABLE_NAME);
+    ArrayList<String> columns = new ArrayList<String>();
+    columns.add("ID");
+    outputOperator.setColumns(columns);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("getID()");
+    outputOperator.setExpressions(expressions);
     outputOperator.setStore(transactionalStore);
 
     outputOperator.setup(context);
@@ -249,5 +230,49 @@
 
     Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
   }
+
+   public InnerObj innerObj = new InnerObj();
+
+  /**
+   * @return the innerObj
+   */
+  public InnerObj getInnerObj()
+  {
+    return innerObj;
+  }
+
+  /**
+   * @param innerObj the innerObj to set
+   */
+  public void setInnerObj(InnerObj innerObj)
+  {
+    this.innerObj = innerObj;
+  }
+
+  public class InnerObj
+  {
+    public InnerObj()
+    {
+    }
+
+    private int ID=11;
+
+    /**
+     * @return the int ID
+     */
+    public int getID()
+    {
+      return ID;
+    }
+
+    /**
+     * @param ID the intVal to set
+     */
+    public void setID(int ID)
+    {
+      this.ID = ID;
+    }
+
+  }
 }