MLHR-1738 #resolve Created an implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 86b6782..eb48846 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -30,10 +30,12 @@
 
 /**
  * Base input adapter which reads data from persistence database through DATASTAX API and writes into output port(s). 
- * Subclasses should provide implementation to get tuples and querying to retrieve data. 
+ * Subclasses should provide implementation to get tuples and querying to retrieve data.
  * <p>
  * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(Row)}.
  * </p>
+ *
+ * @param <T>
  * @displayName Abstract Cassandra Input
  * @category Store
  * @tags input operator
@@ -73,17 +75,27 @@
   {
     String query = queryToRetrieveData();
     logger.debug(String.format("select statement: %s", query));
-
-    try {
-      ResultSet result = store.getSession().execute(query);
-      for(Row row: result) {
-        T tuple = getTuple(row);
-        outputPort.emit(tuple);
+    if (!"".equals(query)) {
+      try {
+        ResultSet result = store.getSession().execute(query);
+        processResult(result);
+      }
+      catch (Exception ex) {
+        store.disconnect();
+        DTThrowable.rethrow(ex);
       }
     }
-    catch (Exception ex) {
-      store.disconnect();
-      DTThrowable.rethrow(ex);
+  }
+
+  /*
+   * ProcessResult can be overridden in classes providing implementation of  AbstractCassandraInputOperator.
+   * Providing a basic implementation here which iterates through the rows in result,gets the particular tuple from row and emits it.
+   */
+  protected  void processResult(ResultSet result)
+  {
+    for (Row row: result) {
+          T tuple = getTuple(row);
+          outputPort.emit(tuple);
     }
   }
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
new file mode 100644
index 0000000..b1caa66
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -0,0 +1,442 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import java.math.BigDecimal;
+import java.util.*;
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * CassandraInputOperator</p>
+ * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
+ *
+ * @displayName Cassandra POJO Input Operator
+ * @category Input
+ * @tags input operator
+ */
+public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
+{
+  private List<String> columns;
+  private final transient List<DataType> columnDataTypes;
+  private Number startRow = 0;
+  @NotNull
+  private List<String> expressions;
+  @NotNull
+  private String tablename;
+  private final transient List<Object> setters;
+  @NotNull
+  private String retrieveQuery;
+  private transient Class<?> objectClass = null;
+  private boolean useAllColumns;
+  protected Number lastRowIdInBatch = 0;
+  @NotNull
+  protected String primaryKeyColumn;
+  protected transient DataType primaryKeyColumnType;
+
+  @Min(1)
+  private int limit = 10;
+
+  /*
+   * Number of records to be fetched in one time from cassandra table.
+   */
+  public int getLimit()
+  {
+    return limit;
+  }
+
+  public void setLimit(int limit)
+  {
+    this.limit = limit;
+  }
+
+  /*
+   * Primary Key Column of table.
+   * Gets the primary key column of Cassandra table.
+   */
+  public String getPrimaryKeyColumn()
+  {
+    return primaryKeyColumn;
+  }
+
+  public void setPrimaryKeyColumn(String primaryKeyColumn)
+  {
+    this.primaryKeyColumn = primaryKeyColumn;
+  }
+
+
+  /*
+   * User has the option to specify the starting row of the range of data they desire.
+   */
+  public Number getStartRow()
+  {
+    return startRow;
+  }
+
+  public void setStartRow(Number startRow)
+  {
+    this.startRow = startRow;
+  }
+
+  /*
+   * This option is for user to create POJO fields from a subset of columns in cassandra table.
+   */
+  public boolean isUseAllColumns()
+  {
+    return useAllColumns;
+  }
+
+  public void setUseAllColumns(boolean useAllColumns)
+  {
+    this.useAllColumns = useAllColumns;
+  }
+
+  /*
+   * POJO class which is generated as output from this operator.
+   * Example:
+   * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
+   * outputClass = TestPOJO
+   * POJOs will be generated on fly in later implementation.
+   */
+  private String outputClass;
+
+  public String getOutputClass()
+  {
+    return outputClass;
+  }
+
+  public void setOutputClass(String outputClass)
+  {
+    this.outputClass = outputClass;
+  }
+
+  /*
+   * Query input by user: Example: select * from keyspace.tablename;
+   */
+  public String getRetrieveQuery()
+  {
+    return retrieveQuery;
+  }
+
+  public void setRetrieveQuery(String retrieveQuery)
+  {
+    this.retrieveQuery = retrieveQuery;
+  }
+
+  /*
+   * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
+   * Each expression corresponds to one column in the Cassandra table.
+   */
+  public List<String> getExpressions()
+  {
+    return expressions;
+  }
+
+  public void setExpressions(List<String> expressions)
+  {
+    this.expressions = expressions;
+  }
+
+  /*
+   * Subset of columns specified by User in case POJO needs to contain fields specific to these columns only.
+   */
+  public List<String> getColumns()
+  {
+    return columns;
+  }
+
+  public void setColumns(List<String> columns)
+  {
+    this.columns = columns;
+  }
+
+  /*
+   * Tablename in cassandra.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  public CassandraPOJOInputOperator()
+  {
+    super();
+    columnDataTypes = new ArrayList<DataType>();
+    setters = new ArrayList<Object>();
+    this.store = new CassandraStore();
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    super.setup(context);
+    if (setters.isEmpty()) {
+      try {
+        // This code will be replaced after integration of creating POJOs on the fly utility.
+        objectClass = Class.forName(outputClass);
+      }
+      catch (ClassNotFoundException ex) {
+        throw new RuntimeException(ex);
+      }
+
+      int numberOfColumns;
+
+      com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
+      ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+      if (!isUseAllColumns()) {
+        numberOfColumns = rsMetaData.size();
+      }
+      else {
+        numberOfColumns = columns.size();
+      }
+
+      primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
+
+      for (int i = 0; i < numberOfColumns; i++) {
+        // Get the designated column's data type.
+        DataType type;
+        if (!isUseAllColumns()) {
+          type = rsMetaData.getType(i);
+        }
+        else {
+          type = rsMetaData.getType(columns.get(i));
+        }
+        columnDataTypes.add(type);
+        Object setter;
+        final String setterExpr = expressions.get(i);
+        switch (type.getName()) {
+          case ASCII:
+          case TEXT:
+          case VARCHAR:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, String.class);
+            break;
+          case BOOLEAN:
+            setter = PojoUtils.createSetterBoolean(objectClass, setterExpr);
+            break;
+          case INT:
+            setter = PojoUtils.createSetterInt(objectClass, setterExpr);
+            break;
+          case BIGINT:
+          case COUNTER:
+            setter = PojoUtils.createSetterLong(objectClass, setterExpr);
+            break;
+          case FLOAT:
+            setter = PojoUtils.createSetterFloat(objectClass, setterExpr);
+            break;
+          case DOUBLE:
+            setter = PojoUtils.createSetterDouble(objectClass, setterExpr);
+            break;
+          case DECIMAL:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, BigDecimal.class);
+            break;
+          case SET:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, Set.class);
+            break;
+          case MAP:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, Map.class);
+            break;
+          case LIST:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, List.class);
+            break;
+          case TIMESTAMP:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, Date.class);
+            break;
+          case UUID:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, UUID.class);
+            break;
+          default:
+            setter = PojoUtils.createSetter(objectClass, setterExpr, Object.class);
+            break;
+        }
+        setters.add(setter);
+      }
+    }
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Object getTuple(Row row)
+  {
+    Object obj = null;
+    final int size = columnDataTypes.size();
+
+    try {
+      // This code will be replaced after integration of creating POJOs on the fly utility.
+      obj = objectClass.newInstance();
+    }
+    catch (InstantiationException ex) {
+      throw new RuntimeException(ex);
+    }
+    catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+
+    for (int i = 0; i < size; i++) {
+      DataType type = columnDataTypes.get(i);
+      switch (type.getName()) {
+        case UUID:
+          final UUID id = row.getUUID(i);
+          ((Setter<Object, UUID>)setters.get(i)).set(obj, id);
+          break;
+        case ASCII:
+        case VARCHAR:
+        case TEXT:
+          final String ascii = row.getString(i);
+          ((Setter<Object, String>)setters.get(i)).set(obj, ascii);
+          break;
+        case BOOLEAN:
+          final boolean bool = row.getBool(i);
+          ((SetterBoolean)setters.get(i)).set(obj, bool);
+          break;
+        case INT:
+          final int intValue = row.getInt(i);
+          ((SetterInt)setters.get(i)).set(obj, intValue);
+          break;
+
+        case BIGINT:
+        case COUNTER:
+          final long longValue = row.getLong(i);
+          ((SetterLong)setters.get(i)).set(obj, longValue);
+          break;
+        case FLOAT:
+          final float floatValue = row.getFloat(i);
+          ((SetterFloat)setters.get(i)).set(obj, floatValue);
+          break;
+        case DOUBLE:
+          final double doubleValue = row.getDouble(i);
+          ((SetterDouble)setters.get(i)).set(obj, doubleValue);
+          break;
+        case DECIMAL:
+          final BigDecimal decimal = row.getDecimal(i);
+          ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, decimal);
+          break;
+        case SET:
+          Set<?> set = row.getSet(i, Object.class);
+          ((Setter<Object, Set<?>>)setters.get(i)).set(obj, set);
+          break;
+        case MAP:
+          final Map<?, ?> map = row.getMap(i, Object.class, Object.class);
+          ((Setter<Object, Map<?, ?>>)setters.get(i)).set(obj, map);
+          break;
+        case LIST:
+          final List<?> list = row.getList(i, Object.class);
+          ((Setter<Object, List<?>>)setters.get(i)).set(obj, list);
+          break;
+        case TIMESTAMP:
+          final Date date = row.getDate(i);
+          ((Setter<Object, Date>)setters.get(i)).set(obj, date);
+          break;
+        default:
+          throw new RuntimeException("unsupported data type " + type.getName());
+      }
+    }
+    return obj;
+  }
+
+  @Override
+  public String queryToRetrieveData()
+  {
+    boolean flag = false;
+
+    switch (primaryKeyColumnType.getName()) {
+      case INT:
+        if (startRow.intValue() > lastRowIdInBatch.intValue()) {
+          flag = true;
+        }
+        break;
+      case COUNTER:
+        if (startRow.longValue() > lastRowIdInBatch.longValue()) {
+          flag = true;
+        }
+        break;
+      case FLOAT:
+        if (startRow.floatValue() > lastRowIdInBatch.floatValue()) {
+          flag = true;
+        }
+        break;
+      case DOUBLE:
+        if (startRow.doubleValue() > lastRowIdInBatch.doubleValue()) {
+          flag = true;
+        }
+        break;
+    }
+
+    if (flag) {
+      return "";
+    }
+
+    startRow = lastRowIdInBatch.intValue() + 1;
+    StringBuilder sb = new StringBuilder();
+    sb.append(retrieveQuery).append(" where ").append("token(").append(primaryKeyColumn).append(")").append(">=").append(startRow).append(" LIMIT ").append(limit);
+    logger.debug("retrievequery is {}", sb.toString());
+
+    return sb.toString();
+  }
+
+  /*
+   * Overriding processResult to save primarykey column value from last row in batch.
+   */
+  @Override
+  protected void processResult(ResultSet result)
+  {
+      Row lastRowInBatch = null;
+        for (Row row: result) {
+          Object tuple = getTuple(row);
+          outputPort.emit(tuple);
+          lastRowInBatch = row;
+        }
+           if (lastRowInBatch != null) {
+          switch (primaryKeyColumnType.getName()) {
+            case INT:
+              lastRowIdInBatch = lastRowInBatch.getInt(0);
+              break;
+            case COUNTER:
+              lastRowIdInBatch = lastRowInBatch.getLong(0);
+              break;
+            case FLOAT:
+              lastRowIdInBatch = lastRowInBatch.getFloat(0);
+              break;
+            case DOUBLE:
+              lastRowIdInBatch = lastRowInBatch.getDouble(0);
+              break;
+            default:
+              throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
+          }
+        }
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
similarity index 93%
rename from contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
rename to contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 3a1b048..1fe2e77 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -42,14 +42,15 @@
  * @tags output operator
  * @since 2.1.0
  */
-public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
 {
+  private static final long serialVersionUID = 201506181024L;
   @NotNull
   private ArrayList<String> columns;
-  private final ArrayList<DataType> columnDataTypes;
+  private final transient ArrayList<DataType> columnDataTypes;
   @NotNull
   private ArrayList<String> expressions;
-  private transient ArrayList<Object> getters;
+  private final transient ArrayList<Object> getters;
 
   /*
    * An ArrayList of Java expressions that will yield the field value from the POJO.
@@ -95,7 +96,7 @@
     this.tablename = tablename;
   }
 
-  public CassandraOutputOperator()
+  public CassandraPOJOOutputOperator()
   {
     super();
     columnDataTypes = new ArrayList<DataType>();
@@ -170,13 +171,13 @@
   {
     StringBuilder queryfields = new StringBuilder("");
     StringBuilder values = new StringBuilder("");
-    for (int i = 0; i < columns.size(); i++) {
+    for (String column: columns) {
       if (queryfields.length() == 0) {
-        queryfields.append(columns.get(i));
+        queryfields.append(column);
         values.append("?");
       }
       else {
-        queryfields.append(",").append(columns.get(i));
+        queryfields.append(",").append(column);
         values.append(",").append("?");
       }
     }
@@ -259,5 +260,5 @@
     return boundStmnt;
   }
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 8a92788..a6d0d21 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -41,11 +41,13 @@
   public static final String KEYSPACE = "demo";
 
   private static final String TABLE_NAME = "test";
-  private static String APP_ID = "CassandraOperatorTest";
-  private static int OPERATOR_ID = 0;
+  private static final String TABLE_NAME_INPUT = "testinput";
+  private static final String APP_ID = "CassandraOperatorTest";
+  private static final int OPERATOR_ID = 0;
   private static Cluster cluster = null;
   private static Session session = null;
 
+  @SuppressWarnings("unused")
   private static class TestEvent
   {
     int id;
@@ -74,6 +76,8 @@
       session.execute(createMetaTable);
       String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
       session.execute(createTable);
+      createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME_INPUT + " (id int PRIMARY KEY,lastname text,age int);";
+      session.execute(createTable);
     }
     catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -86,6 +90,7 @@
     if (session != null) {
       session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
       session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
+      session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME_INPUT);
       session.close();
     }
     if (cluster != null) {
@@ -93,110 +98,91 @@
     }
   }
 
-
-  private static class TestOutputOperator extends CassandraOutputOperator
+  private static class TestOutputOperator extends CassandraPOJOOutputOperator
   {
+    private static final long serialVersionUID = 201506181038L;
+
     public long getNumOfEventsInStore()
     {
-        String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
-        ResultSet resultSetCount = session.execute(countQuery);
-        for (Row row: resultSetCount) {
-          return row.getLong(0);
-        }
-        return 0;
+      String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
+      ResultSet resultSetCount = session.execute(countQuery);
+      for (Row row: resultSetCount) {
+        return row.getLong(0);
+      }
+      return 0;
 
     }
 
     public void getEventsInStore()
     {
-        String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
-        ResultSet resultSetRecords = session.execute(recordsQuery);
-        int count =0;
-        for (Row row: resultSetRecords) {
-          LOG.debug("Boolean value is {}", row.getBool("test"));
-          Assert.assertEquals(true, row.getBool("test"));
-          LOG.debug("lastname returned is {}", row.getString("lastname"));
-          Assert.assertEquals("abclast", row.getString("lastname"));
-          LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
-          Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
-          LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
-          LOG.debug("age returned is {}", row.getInt("age"));
-          LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
-          LOG.debug("list returned is {}", row.getList("list1", Integer.class));
-          LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
-          LOG.debug("date returned is {}", row.getDate("last_visited"));
-          Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
-          if(count == 0)
-          {
-            Assert.assertEquals(2, row.getInt("age"));
-            Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(2);
-            list.add(2);
-            map.put("key2", 2);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          if(count == 1)
-          {
-            Assert.assertEquals(0, row.getInt("age"));
-            Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(0);
-            list.add(0);
-            map.put("key0", 0);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          if(count == 2)
-          {
-            Assert.assertEquals(1, row.getInt("age"));
-            Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(1);
-            list.add(1);
-            map.put("key1", 1);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          count++;
+      String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+      ResultSet resultSetRecords = session.execute(recordsQuery);
+      int count = 0;
+      for (Row row: resultSetRecords) {
+        LOG.debug("Boolean value is {}", row.getBool("test"));
+        Assert.assertEquals(true, row.getBool("test"));
+        LOG.debug("lastname returned is {}", row.getString("lastname"));
+        Assert.assertEquals("abclast", row.getString("lastname"));
+        LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+        Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
+        LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+        LOG.debug("age returned is {}", row.getInt("age"));
+        LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+        LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+        LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+        LOG.debug("date returned is {}", row.getDate("last_visited"));
+        Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
+        if (count == 0) {
+          Assert.assertEquals(2, row.getInt("age"));
+          Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(2);
+          list.add(2);
+          map.put("key2", 2);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
         }
-
+        if (count == 1) {
+          Assert.assertEquals(0, row.getInt("age"));
+          Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(0);
+          list.add(0);
+          map.put("key0", 0);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
+        }
+        if (count == 2) {
+          Assert.assertEquals(1, row.getInt("age"));
+          Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(1);
+          list.add(1);
+          map.put("key1", 1);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
+        }
+        count++;
+      }
 
     }
 
   }
 
-  private static class TestInputOperator extends AbstractCassandraInputOperator<TestEvent>
+  private static class TestInputOperator extends CassandraPOJOInputOperator
   {
-
-    private static final String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
-
-    @Override
-    public TestEvent getTuple(Row row)
-    {
-      try {
-        return new TestEvent(row.getInt(0));
-      }
-      catch (DriverException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public String queryToRetrieveData()
-    {
-      return retrieveQuery;
-    }
+    private final ArrayList<Integer> ids = new ArrayList<Integer>();
+    private final HashMap<Integer, String> mapNames = new HashMap<Integer, String>();
+    private final HashMap<Integer, Integer> mapAge = new HashMap<Integer, Integer>();
 
     public void insertEventsInTable(int numEvents)
     {
@@ -205,13 +191,14 @@
                 .addContactPoint(NODE).build();
         Session session = cluster.connect(KEYSPACE);
 
-        String insert = "INSERT INTO " + TABLE_NAME + " (ID)" + " VALUES (?);";
+        String insert = "INSERT INTO " + TABLE_NAME_INPUT + " (ID,lastname,age)" + " VALUES (?,?,?);";
         PreparedStatement stmt = session.prepare(insert);
         BoundStatement boundStatement = new BoundStatement(stmt);
-        Statement statement;
         for (int i = 0; i < numEvents; i++) {
-          statement = boundStatement.bind(i);
-          session.execute(statement);
+          ids.add(i);
+          mapNames.put(i, "test" + i);
+          mapAge.put(i, i + 10);
+          session.execute(boundStatement.bind(i, "test" + i, i + 10));
         }
       }
       catch (DriverException e) {
@@ -219,6 +206,21 @@
       }
     }
 
+    public ArrayList<Integer> getIds()
+    {
+      return ids;
+    }
+
+    public HashMap<Integer, String> getNames()
+    {
+      return mapNames;
+    }
+
+    public HashMap<Integer, Integer> getAge()
+    {
+      return mapAge;
+    }
+
   }
 
   @Test
@@ -284,10 +286,13 @@
     outputOperator.getEventsInStore();
   }
 
-  //This Test needs to be improved.
+  /*
+   * This test can be run on cassandra server installed on node17.
+   */
   @Test
   public void TestCassandraInputOperator()
   {
+    String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT;
     CassandraStore store = new CassandraStore();
     store.setNode(NODE);
     store.setKeyspace(KEYSPACE);
@@ -298,22 +303,49 @@
 
     TestInputOperator inputOperator = new TestInputOperator();
     inputOperator.setStore(store);
-    inputOperator.insertEventsInTable(10);
+    inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
+    inputOperator.setTablename(TABLE_NAME_INPUT);
+    inputOperator.setRetrieveQuery(retrieveQuery);
+    ArrayList<String> columns = new ArrayList<String>();
+    columns.add("id");
+    columns.add("age");
+    columns.add("lastname");
 
+    inputOperator.setColumns(columns);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("id");
+    expressions.add("age");
+    expressions.add("lastname");
+    inputOperator.setExpressions(expressions);
+
+    inputOperator.insertEventsInTable(30);
+    inputOperator.setPrimaryKeyColumn("id");
     CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
     inputOperator.outputPort.setSink(sink);
-
     inputOperator.setup(context);
     inputOperator.beginWindow(0);
+    inputOperator.insertEventsInTable(10);
     inputOperator.emitTuples();
     inputOperator.endWindow();
-
     Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+    inputOperator.beginWindow(1);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 20, sink.collectedTuples.size());
+    ArrayList<Integer> listOfIDs = inputOperator.getIds();
+    // Rows are not stored in the same order in cassandra table in which they are inserted.
+    for (int i = 0; i < 10; i++) {
+      TestInputPojo object = (TestInputPojo)sink.collectedTuples.get(i);
+      Assert.assertTrue("id set in testpojo", listOfIDs.contains(object.getId()));
+      Assert.assertEquals("name set in testpojo", inputOperator.getNames().get(object.getId()), object.getLastname());
+      Assert.assertEquals("age set in testpojo", inputOperator.getAge().get(object.getId()).intValue(), object.getAge());
+    }
+
   }
 
   public static class TestPojo
   {
-    private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
+    public TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
     {
       this.id = randomUUID;
       this.age = i;
@@ -440,6 +472,6 @@
 
   }
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
 
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
new file mode 100644
index 0000000..3a20406
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+
+/*
+ * TestInputPojo class specified by user for specifying schema as part of testing.
+ * CassandraPOJOInputOperator can be used to get Field values from cassandra database columns
+ * and set in this pojo as an example.
+ */
+public class TestInputPojo
+{
+  private int id;
+
+  public int getId()
+  {
+    return id;
+  }
+
+  public void setId(int id)
+  {
+    this.id = id;
+  }
+  private String lastname;
+  private int age;
+
+
+  public int getAge()
+  {
+    return age;
+  }
+
+  public void setAge(int age)
+  {
+    this.age = age;
+  }
+
+  public String getLastname()
+  {
+    return lastname;
+  }
+
+  public void setLastname(String lastname)
+  {
+    this.lastname = lastname;
+  }
+
+}