changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 86b6782..7e67398 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -30,10 +30,11 @@
 
 /**
  * Base input adapter which reads data from persistence database through DATASTAX API and writes into output port(s). 
- * Subclasses should provide implementation to get tuples and querying to retrieve data. 
+ * Subclasses should provide implementation to get tuples and querying to retrieve data.
  * <p>
  * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(Row)}.
  * </p>
+ * @param <T>
  * @displayName Abstract Cassandra Input
  * @category Store
  * @tags input operator
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
new file mode 100644
index 0000000..786f24f
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -0,0 +1,254 @@
+/*
+ *  Copyright (c) 2012-2015 Malhar, Inc.
+ *  All Rights Reserved.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import com.datastax.driver.core.ColumnDefinitions;
+import com.datastax.driver.core.DataType;
+import com.datastax.driver.core.Row;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Setter;
+import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.SetterDouble;
+import com.datatorrent.lib.util.PojoUtils.SetterFloat;
+import com.datatorrent.lib.util.PojoUtils.SetterInt;
+import com.datatorrent.lib.util.PojoUtils.SetterLong;
+import java.math.BigDecimal;
+import java.util.*;
+import javax.validation.constraints.NotNull;
+
+public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
+{
+  @NotNull
+  private ArrayList<String> columns;
+  private final ArrayList<DataType> columnDataTypes;
+  @NotNull
+  private ArrayList<String> expressions;
+  @NotNull
+  private String tablename;
+  private transient ArrayList<Object> setters;
+  private String retrieveQuery;
+
+  /*
+   * Output POJO being input by the user.
+   * POJOs will be generated on fly in later implementation.
+   */
+  private String outputClass;
+
+  public String getOutputClass()
+  {
+    return outputClass;
+  }
+
+  public void setOutputClass(String outputClass)
+  {
+    this.outputClass = outputClass;
+  }
+
+  public String getRetrieveQuery()
+  {
+    return retrieveQuery;
+  }
+
+  public void setRetrieveQuery(String retrieveQuery)
+  {
+    this.retrieveQuery = retrieveQuery;
+  }
+
+  public ArrayList<String> getExpressions()
+  {
+    return expressions;
+  }
+
+  public void setExpressions(ArrayList<String> expressions)
+  {
+    this.expressions = expressions;
+  }
+
+
+  /*
+   * An ArrayList of Columns in the Cassandra Table.
+   */
+  public ArrayList<String> getColumns()
+  {
+    return columns;
+  }
+
+  public void setColumns(ArrayList<String> columns)
+  {
+    this.columns = columns;
+  }
+
+  /*
+   * Tablename in cassandra.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  public CassandraPOJOInputOperator()
+  {
+    super();
+    columnDataTypes = new ArrayList<DataType>();
+    setters = new ArrayList<Object>();
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public Object getTuple(Row row)
+  {
+    Class<?> className = null;
+    Object obj = null;
+
+    com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
+    final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+
+    final int numberOfColumns = rsMetaData.size();
+    if (setters.isEmpty()) {
+      System.out.println("create setters");
+    try {
+      className = Class.forName(outputClass);
+      obj = className.newInstance();
+    }
+    catch (ClassNotFoundException ex) {
+      throw new RuntimeException(ex);
+    }
+    catch (InstantiationException ex) {
+      throw new RuntimeException(ex);
+    }
+    catch (IllegalAccessException ex) {
+      throw new RuntimeException(ex);
+    }
+      for (int i = 0; i < numberOfColumns; i++) {
+        // get the designated column's data type.
+        final DataType type = rsMetaData.getType(i);
+        columnDataTypes.add(type);
+        Object setter = null;
+        final String setterExpr = expressions.get(i);
+        switch (type.getName()) {
+          case ASCII:
+          case TEXT:
+          case VARCHAR:
+            setter = PojoUtils.createSetter(className, setterExpr, String.class);
+            break;
+          case BOOLEAN:
+            setter = PojoUtils.createSetterBoolean(className, setterExpr);
+            break;
+          case INT:
+            setter = PojoUtils.createSetterInt(className, setterExpr);
+            break;
+          case BIGINT:
+          case COUNTER:
+            setter = PojoUtils.createSetterLong(className, setterExpr);
+            break;
+          case FLOAT:
+            setter = PojoUtils.createSetterFloat(className, setterExpr);
+            break;
+          case DOUBLE:
+            setter = PojoUtils.createSetterDouble(className, setterExpr);
+            break;
+          case DECIMAL:
+            setter = PojoUtils.createSetter(className, setterExpr, BigDecimal.class);
+            break;
+          case SET:
+            setter = PojoUtils.createSetter(className, setterExpr, Set.class);
+            break;
+          case MAP:
+            setter = PojoUtils.createSetter(className, setterExpr, Map.class);
+            break;
+          case LIST:
+            setter = PojoUtils.createSetter(className, setterExpr, List.class);
+            break;
+          case TIMESTAMP:
+            setter = PojoUtils.createSetter(className, setterExpr, Date.class);
+            break;
+          case UUID:
+            setter = PojoUtils.createSetter(className, setterExpr, UUID.class);
+            break;
+          default:
+            setter = PojoUtils.createSetter(className, setterExpr, Object.class);
+            break;
+        }
+        setters.add(setter);
+      }
+    }
+
+    for (int i = 0; i < numberOfColumns; i++) {
+      final DataType type = columnDataTypes.get(i);
+      switch (type.getName()) {
+        case UUID:
+          final UUID id = row.getUUID(i);
+          System.out.println("id is "+id);
+          ((Setter<Object, UUID>)setters.get(i)).set(obj, id);
+          break;
+        case ASCII:
+        case VARCHAR:
+        case TEXT:
+          final String ascii = row.getString(i);
+          System.out.println("ascii is "+ascii);
+          ((Setter<Object, String>)setters.get(i)).set(obj, ascii);
+          break;
+        case BOOLEAN:
+          final boolean bool = row.getBool(i);
+          ((SetterBoolean)setters.get(i)).set(obj, bool);
+          break;
+        case INT:
+          final int intValue = row.getInt(i);
+           System.out.println("age is "+intValue);
+          ((SetterInt)setters.get(i)).set(obj, intValue);
+          break;
+        case BIGINT:
+        case COUNTER:
+          final long longValue = row.getLong(i);
+          ((SetterLong)setters.get(i)).set(obj, longValue);
+          break;
+        case FLOAT:
+          final float floatValue = row.getFloat(i);
+          ((SetterFloat)setters.get(i)).set(obj, floatValue);
+          break;
+        case DOUBLE:
+          final double doubleValue = row.getDouble(i);
+          ((SetterDouble)setters.get(i)).set(obj, doubleValue);
+          break;
+        case DECIMAL:
+          final BigDecimal decimal = row.getDecimal(i);
+          ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, decimal);
+          break;
+        case SET:
+          Set<?> set = row.getSet(i, Object.class);
+          ((Setter<Object, Set<?>>)setters.get(i)).set(obj, set);
+
+          break;
+        case MAP:
+          final Map<?, ?> map = row.getMap(i, Object.class, Object.class);
+          ((Setter<Object, Map<?, ?>>)setters.get(i)).set(obj, map);
+          break;
+        case LIST:
+          final List<?> list = row.getList(i, Object.class);
+          ((Setter<Object, List<?>>)setters.get(i)).set(obj, list);
+          break;
+        case TIMESTAMP:
+          final Date date = row.getDate(i);
+          ((Setter<Object, Date>)setters.get(i)).set(obj, date);
+          break;
+        default:
+          throw new RuntimeException("unsupported data type " + type.getName());
+      }
+    }
+    return obj;
+  }
+
+  @Override
+  public String queryToRetrieveData()
+  {
+    return retrieveQuery;
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
similarity index 97%
rename from contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
rename to contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 3a1b048..0e4dcff 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -42,7 +42,7 @@
  * @tags output operator
  * @since 2.1.0
  */
-public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+public class CassandraPOJOOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
 {
   @NotNull
   private ArrayList<String> columns;
@@ -95,7 +95,7 @@
     this.tablename = tablename;
   }
 
-  public CassandraOutputOperator()
+  public CassandraPOJOOutputOperator()
   {
     super();
     columnDataTypes = new ArrayList<DataType>();
@@ -259,5 +259,5 @@
     return boundStmnt;
   }
 
-  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
+  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 8a92788..a412cf7 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -93,111 +93,87 @@
     }
   }
 
-
-  private static class TestOutputOperator extends CassandraOutputOperator
+  private static class TestOutputOperator extends CassandraPOJOOutputOperator
   {
     public long getNumOfEventsInStore()
     {
-        String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
-        ResultSet resultSetCount = session.execute(countQuery);
-        for (Row row: resultSetCount) {
-          return row.getLong(0);
-        }
-        return 0;
+      String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
+      ResultSet resultSetCount = session.execute(countQuery);
+      for (Row row: resultSetCount) {
+        return row.getLong(0);
+      }
+      return 0;
 
     }
 
     public void getEventsInStore()
     {
-        String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
-        ResultSet resultSetRecords = session.execute(recordsQuery);
-        int count =0;
-        for (Row row: resultSetRecords) {
-          LOG.debug("Boolean value is {}", row.getBool("test"));
-          Assert.assertEquals(true, row.getBool("test"));
-          LOG.debug("lastname returned is {}", row.getString("lastname"));
-          Assert.assertEquals("abclast", row.getString("lastname"));
-          LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
-          Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
-          LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
-          LOG.debug("age returned is {}", row.getInt("age"));
-          LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
-          LOG.debug("list returned is {}", row.getList("list1", Integer.class));
-          LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
-          LOG.debug("date returned is {}", row.getDate("last_visited"));
-          Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
-          if(count == 0)
-          {
-            Assert.assertEquals(2, row.getInt("age"));
-            Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(2);
-            list.add(2);
-            map.put("key2", 2);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          if(count == 1)
-          {
-            Assert.assertEquals(0, row.getInt("age"));
-            Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(0);
-            list.add(0);
-            map.put("key0", 0);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          if(count == 2)
-          {
-            Assert.assertEquals(1, row.getInt("age"));
-            Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
-            Set<Integer> set = new HashSet<Integer>();
-            List<Integer> list = new ArrayList<Integer>();
-            Map<String,Integer> map = new HashMap<String, Integer>();
-            set.add(1);
-            list.add(1);
-            map.put("key1", 1);
-            Assert.assertEquals(set, row.getSet("set1", Integer.class));
-            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
-            Assert.assertEquals(list,row.getList("list1", Integer.class));
-          }
-          count++;
+      String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+      ResultSet resultSetRecords = session.execute(recordsQuery);
+      int count = 0;
+      for (Row row: resultSetRecords) {
+        LOG.debug("Boolean value is {}", row.getBool("test"));
+        Assert.assertEquals(true, row.getBool("test"));
+        LOG.debug("lastname returned is {}", row.getString("lastname"));
+        Assert.assertEquals("abclast", row.getString("lastname"));
+        LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+        Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
+        LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+        LOG.debug("age returned is {}", row.getInt("age"));
+        LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+        LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+        LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+        LOG.debug("date returned is {}", row.getDate("last_visited"));
+        Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
+        if (count == 0) {
+          Assert.assertEquals(2, row.getInt("age"));
+          Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(2);
+          list.add(2);
+          map.put("key2", 2);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
         }
-
+        if (count == 1) {
+          Assert.assertEquals(0, row.getInt("age"));
+          Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(0);
+          list.add(0);
+          map.put("key0", 0);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
+        }
+        if (count == 2) {
+          Assert.assertEquals(1, row.getInt("age"));
+          Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
+          Set<Integer> set = new HashSet<Integer>();
+          List<Integer> list = new ArrayList<Integer>();
+          Map<String, Integer> map = new HashMap<String, Integer>();
+          set.add(1);
+          list.add(1);
+          map.put("key1", 1);
+          Assert.assertEquals(set, row.getSet("set1", Integer.class));
+          Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+          Assert.assertEquals(list, row.getList("list1", Integer.class));
+        }
+        count++;
+      }
 
     }
 
   }
 
-  private static class TestInputOperator extends AbstractCassandraInputOperator<TestEvent>
+  private static class TestInputOperator extends CassandraPOJOInputOperator
   {
 
-    private static final String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
-
-    @Override
-    public TestEvent getTuple(Row row)
-    {
-      try {
-        return new TestEvent(row.getInt(0));
-      }
-      catch (DriverException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public String queryToRetrieveData()
-    {
-      return retrieveQuery;
-    }
-
     public void insertEventsInTable(int numEvents)
     {
       try {
@@ -221,6 +197,45 @@
 
   }
 
+  public static class TestInputPojo
+  {
+    private UUID id;
+    private String lastname;
+    private int age;
+
+    public int getAge()
+    {
+      return age;
+    }
+
+    public void setAge(int age)
+    {
+      this.age = age;
+    }
+
+    public UUID getId()
+    {
+      return id;
+    }
+
+    public void setId(UUID id)
+    {
+      this.id = id;
+    }
+
+    public String getLastname()
+    {
+      return lastname;
+    }
+
+    public void setLastname(String lastname)
+    {
+      this.lastname = lastname;
+    }
+
+
+  }
+
   @Test
   public void testCassandraOutputOperator()
   {
@@ -288,6 +303,7 @@
   @Test
   public void TestCassandraInputOperator()
   {
+    String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
     CassandraStore store = new CassandraStore();
     store.setNode(NODE);
     store.setKeyspace(KEYSPACE);
@@ -299,6 +315,23 @@
     TestInputOperator inputOperator = new TestInputOperator();
     inputOperator.setStore(store);
     inputOperator.insertEventsInTable(10);
+    inputOperator.setOutputClass("TestInputPojo");
+    inputOperator.setTablename(TABLE_NAME);
+    inputOperator.setRetrieveQuery(retrieveQuery);
+    ArrayList<String> columns = new ArrayList<String>();
+    columns.add("id");
+    columns.add("age");
+    columns.add("lastname");
+
+    inputOperator.setColumns(columns);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("id");
+    expressions.add("age");
+    expressions.add("lastname");
+    inputOperator.setExpressions(expressions);
+
+    inputOperator.setStore(store);
+    inputOperator.insertEventsInTable(10);
 
     CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
     inputOperator.outputPort.setSink(sink);
@@ -313,7 +346,7 @@
 
   public static class TestPojo
   {
-    private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
+    public TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
     {
       this.id = randomUUID;
       this.age = i;