Created an implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 786f24f..94fd64c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -1,6 +1,17 @@
 /*
- *  Copyright (c) 2012-2015 Malhar, Inc.
- *  All Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package com.datatorrent.contrib.cassandra;
 
@@ -18,20 +29,32 @@
 import java.util.*;
 import javax.validation.constraints.NotNull;
 
+/**
+ * <p>
+ * CassandraInputOperator</p>
+ * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
+ * @displayName Cassandra Input Operator
+ * @category Input
+ * @tags input operator
+ */
 public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
 {
   @NotNull
   private ArrayList<String> columns;
-  private final ArrayList<DataType> columnDataTypes;
+  private final transient ArrayList<DataType> columnDataTypes;
+
   @NotNull
   private ArrayList<String> expressions;
   @NotNull
   private String tablename;
-  private transient ArrayList<Object> setters;
+  private final transient ArrayList<Object> setters;
   private String retrieveQuery;
 
   /*
-   * Output POJO being input by the user.
+   * POJO class which is generated as output from this operator.
+   * Example:
+   * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
+   * outputClass = TestPOJO
    * POJOs will be generated on fly in later implementation.
    */
   private String outputClass;
@@ -46,6 +69,9 @@
     this.outputClass = outputClass;
   }
 
+  /*
+   * Query input by user: Example: select * from keyspace.tablename;
+   */
   public String getRetrieveQuery()
   {
     return retrieveQuery;
@@ -56,6 +82,10 @@
     this.retrieveQuery = retrieveQuery;
   }
 
+  /*
+   * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
+   * Each expression corresponds to one column in the Cassandra table.
+   */
   public ArrayList<String> getExpressions()
   {
     return expressions;
@@ -107,13 +137,8 @@
     Class<?> className = null;
     Object obj = null;
 
-    com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
-    final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
-
-    final int numberOfColumns = rsMetaData.size();
-    if (setters.isEmpty()) {
-      System.out.println("create setters");
     try {
+      // This code will be replaced after integration of creating POJOs on the fly utility.
       className = Class.forName(outputClass);
       obj = className.newInstance();
     }
@@ -126,11 +151,16 @@
     catch (IllegalAccessException ex) {
       throw new RuntimeException(ex);
     }
+    if (setters.isEmpty()) {
+      com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
+      final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+      final int numberOfColumns = rsMetaData.size();
+
       for (int i = 0; i < numberOfColumns; i++) {
-        // get the designated column's data type.
+        // Get the designated column's data type.
         final DataType type = rsMetaData.getType(i);
         columnDataTypes.add(type);
-        Object setter = null;
+        Object setter;
         final String setterExpr = expressions.get(i);
         switch (type.getName()) {
           case ASCII:
@@ -180,19 +210,20 @@
       }
     }
 
-    for (int i = 0; i < numberOfColumns; i++) {
+    final int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
       final DataType type = columnDataTypes.get(i);
       switch (type.getName()) {
         case UUID:
           final UUID id = row.getUUID(i);
-          System.out.println("id is "+id);
+          System.out.println("id is " + id);
           ((Setter<Object, UUID>)setters.get(i)).set(obj, id);
           break;
         case ASCII:
         case VARCHAR:
         case TEXT:
           final String ascii = row.getString(i);
-          System.out.println("ascii is "+ascii);
+          System.out.println("ascii is " + ascii);
           ((Setter<Object, String>)setters.get(i)).set(obj, ascii);
           break;
         case BOOLEAN:
@@ -201,7 +232,7 @@
           break;
         case INT:
           final int intValue = row.getInt(i);
-           System.out.println("age is "+intValue);
+          System.out.println("age is " + intValue);
           ((SetterInt)setters.get(i)).set(obj, intValue);
           break;
         case BIGINT:
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 0e4dcff..05441a1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -46,10 +46,10 @@
 {
   @NotNull
   private ArrayList<String> columns;
-  private final ArrayList<DataType> columnDataTypes;
+  private final transient ArrayList<DataType> columnDataTypes;
   @NotNull
   private ArrayList<String> expressions;
-  private transient ArrayList<Object> getters;
+  private final transient ArrayList<Object> getters;
 
   /*
    * An ArrayList of Java expressions that will yield the field value from the POJO.
@@ -170,13 +170,13 @@
   {
     StringBuilder queryfields = new StringBuilder("");
     StringBuilder values = new StringBuilder("");
-    for (int i = 0; i < columns.size(); i++) {
+    for (String column: columns) {
       if (queryfields.length() == 0) {
-        queryfields.append(columns.get(i));
+        queryfields.append(column);
         values.append("?");
       }
       else {
-        queryfields.append(",").append(columns.get(i));
+        queryfields.append(",").append(column);
         values.append(",").append("?");
       }
     }
@@ -259,5 +259,5 @@
     return boundStmnt;
   }
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index a412cf7..68e28cc 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -41,11 +41,13 @@
   public static final String KEYSPACE = "demo";
 
   private static final String TABLE_NAME = "test";
+  private static final String TABLE_NAME_INPUT = "testinput";
   private static String APP_ID = "CassandraOperatorTest";
   private static int OPERATOR_ID = 0;
   private static Cluster cluster = null;
   private static Session session = null;
 
+  @SuppressWarnings("unused")
   private static class TestEvent
   {
     int id;
@@ -74,6 +76,8 @@
       session.execute(createMetaTable);
       String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
       session.execute(createTable);
+      createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME_INPUT + " (id uuid PRIMARY KEY,lastname text,age int);";
+      session.execute(createTable);
     }
     catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -93,79 +97,84 @@
     }
   }
 
+
   private static class TestOutputOperator extends CassandraPOJOOutputOperator
   {
     public long getNumOfEventsInStore()
     {
-      String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
-      ResultSet resultSetCount = session.execute(countQuery);
-      for (Row row: resultSetCount) {
-        return row.getLong(0);
-      }
-      return 0;
+        String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
+        ResultSet resultSetCount = session.execute(countQuery);
+        for (Row row: resultSetCount) {
+          return row.getLong(0);
+        }
+        return 0;
 
     }
 
     public void getEventsInStore()
     {
-      String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
-      ResultSet resultSetRecords = session.execute(recordsQuery);
-      int count = 0;
-      for (Row row: resultSetRecords) {
-        LOG.debug("Boolean value is {}", row.getBool("test"));
-        Assert.assertEquals(true, row.getBool("test"));
-        LOG.debug("lastname returned is {}", row.getString("lastname"));
-        Assert.assertEquals("abclast", row.getString("lastname"));
-        LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
-        Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
-        LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
-        LOG.debug("age returned is {}", row.getInt("age"));
-        LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
-        LOG.debug("list returned is {}", row.getList("list1", Integer.class));
-        LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
-        LOG.debug("date returned is {}", row.getDate("last_visited"));
-        Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
-        if (count == 0) {
-          Assert.assertEquals(2, row.getInt("age"));
-          Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
-          Set<Integer> set = new HashSet<Integer>();
-          List<Integer> list = new ArrayList<Integer>();
-          Map<String, Integer> map = new HashMap<String, Integer>();
-          set.add(2);
-          list.add(2);
-          map.put("key2", 2);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
+        String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+        ResultSet resultSetRecords = session.execute(recordsQuery);
+        int count =0;
+        for (Row row: resultSetRecords) {
+          LOG.debug("Boolean value is {}", row.getBool("test"));
+          Assert.assertEquals(true, row.getBool("test"));
+          LOG.debug("lastname returned is {}", row.getString("lastname"));
+          Assert.assertEquals("abclast", row.getString("lastname"));
+          LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+          Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
+          LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+          LOG.debug("age returned is {}", row.getInt("age"));
+          LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+          LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+          LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+          LOG.debug("date returned is {}", row.getDate("last_visited"));
+          Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
+          if(count == 0)
+          {
+            Assert.assertEquals(2, row.getInt("age"));
+            Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(2);
+            list.add(2);
+            map.put("key2", 2);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          if(count == 1)
+          {
+            Assert.assertEquals(0, row.getInt("age"));
+            Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(0);
+            list.add(0);
+            map.put("key0", 0);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          if(count == 2)
+          {
+            Assert.assertEquals(1, row.getInt("age"));
+            Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(1);
+            list.add(1);
+            map.put("key1", 1);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          count++;
         }
-        if (count == 1) {
-          Assert.assertEquals(0, row.getInt("age"));
-          Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
-          Set<Integer> set = new HashSet<Integer>();
-          List<Integer> list = new ArrayList<Integer>();
-          Map<String, Integer> map = new HashMap<String, Integer>();
-          set.add(0);
-          list.add(0);
-          map.put("key0", 0);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
-        }
-        if (count == 2) {
-          Assert.assertEquals(1, row.getInt("age"));
-          Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
-          Set<Integer> set = new HashSet<Integer>();
-          List<Integer> list = new ArrayList<Integer>();
-          Map<String, Integer> map = new HashMap<String, Integer>();
-          set.add(1);
-          list.add(1);
-          map.put("key1", 1);
-          Assert.assertEquals(set, row.getSet("set1", Integer.class));
-          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-          Assert.assertEquals(list, row.getList("list1", Integer.class));
-        }
-        count++;
-      }
+
 
     }
 
@@ -197,45 +206,6 @@
 
   }
 
-  public static class TestInputPojo
-  {
-    private UUID id;
-    private String lastname;
-    private int age;
-
-    public int getAge()
-    {
-      return age;
-    }
-
-    public void setAge(int age)
-    {
-      this.age = age;
-    }
-
-    public UUID getId()
-    {
-      return id;
-    }
-
-    public void setId(UUID id)
-    {
-      this.id = id;
-    }
-
-    public String getLastname()
-    {
-      return lastname;
-    }
-
-    public void setLastname(String lastname)
-    {
-      this.lastname = lastname;
-    }
-
-
-  }
-
   @Test
   public void testCassandraOutputOperator()
   {
@@ -299,11 +269,10 @@
     outputOperator.getEventsInStore();
   }
 
-  //This Test needs to be improved.
   @Test
   public void TestCassandraInputOperator()
   {
-    String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
+    String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT + ";";
     CassandraStore store = new CassandraStore();
     store.setNode(NODE);
     store.setKeyspace(KEYSPACE);
@@ -314,9 +283,8 @@
 
     TestInputOperator inputOperator = new TestInputOperator();
     inputOperator.setStore(store);
-    inputOperator.insertEventsInTable(10);
-    inputOperator.setOutputClass("TestInputPojo");
-    inputOperator.setTablename(TABLE_NAME);
+    inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
+    inputOperator.setTablename(TABLE_NAME_INPUT);
     inputOperator.setRetrieveQuery(retrieveQuery);
     ArrayList<String> columns = new ArrayList<String>();
     columns.add("id");
@@ -331,7 +299,8 @@
     inputOperator.setExpressions(expressions);
 
     inputOperator.setStore(store);
-    inputOperator.insertEventsInTable(10);
+    //Inserting events in cassandra table through shell to check the unique id generated.
+    // inputOperator.insertEventsInTable(2);
 
     CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
     inputOperator.outputPort.setSink(sink);
@@ -340,8 +309,23 @@
     inputOperator.beginWindow(0);
     inputOperator.emitTuples();
     inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 2, sink.collectedTuples.size());
+     int count =0;
+     for (Object o : sink.collectedTuples) {
+       count++;
+      TestInputPojo object = (TestInputPojo)o;
+      if(count == 1){
+      Assert.assertEquals("id set in testpojo", "1878a0a0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
+      Assert.assertEquals("name set in testpojo", "test2", object.getLastname());
+      Assert.assertEquals("age set in testpojo", 13, object.getAge());
+      }
+      if(count == 2){
+       Assert.assertEquals("id set in testpojo", "2196c3b0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
+       Assert.assertEquals("name set in testpojo", "test4", object.getLastname());
+       Assert.assertEquals("age set in testpojo", 15, object.getAge());
+      }
+     }
 
-    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
   }
 
   public static class TestPojo
@@ -472,7 +456,7 @@
     }
 
   }
+  private static final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
 
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
new file mode 100644
index 0000000..37a67df
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.UUID;
+
+public class TestInputPojo
+{
+  private UUID id;
+  private String lastname;
+  private int age;
+
+  public UUID getId()
+  {
+    return id;
+  }
+
+  public void setId(UUID id)
+  {
+    this.id = id;
+  }
+
+  public int getAge()
+  {
+    return age;
+  }
+
+  public void setAge(int age)
+  {
+    this.age = age;
+  }
+
+  public String getLastname()
+  {
+    return lastname;
+  }
+
+  public void setLastname(String lastname)
+  {
+    this.lastname = lastname;
+  }
+
+}