changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index eb48846..7e67398 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -34,7 +34,6 @@
  * <p>
  * This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(Row)}.
  * </p>
- *
  * @param <T>
  * @displayName Abstract Cassandra Input
  * @category Store
@@ -75,27 +74,17 @@
   {
     String query = queryToRetrieveData();
     logger.debug(String.format("select statement: %s", query));
-    if (!"".equals(query)) {
-      try {
-        ResultSet result = store.getSession().execute(query);
-        processResult(result);
-      }
-      catch (Exception ex) {
-        store.disconnect();
-        DTThrowable.rethrow(ex);
+
+    try {
+      ResultSet result = store.getSession().execute(query);
+      for(Row row: result) {
+        T tuple = getTuple(row);
+        outputPort.emit(tuple);
       }
     }
-  }
-
-  /*
-   * ProcessResult can be overridden in classes providing implementation of  AbstractCassandraInputOperator.
-   * Providing a basic implementation here which iterates through the rows in result,gets the particular tuple from row and emits it.
-   */
-  protected  void processResult(ResultSet result)
-  {
-    for (Row row: result) {
-          T tuple = getTuple(row);
-          outputPort.emit(tuple);
+    catch (Exception ex) {
+      store.disconnect();
+      DTThrowable.rethrow(ex);
     }
   }
 }
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index b1caa66..edd84a6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -17,7 +17,6 @@
 
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.lib.util.PojoUtils;
@@ -36,8 +35,9 @@
 
 /**
  * <p>
- * CassandraInputOperator</p>
+ * CassandraPOJOInputOperator</p>
  * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
+ * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
  *
  * @displayName Cassandra POJO Input Operator
  * @category Input
@@ -54,13 +54,14 @@
   private String tablename;
   private final transient List<Object> setters;
   @NotNull
-  private String retrieveQuery;
+  private String query;
+
   private transient Class<?> objectClass = null;
   private boolean useAllColumns;
-  protected Number lastRowIdInBatch = 0;
   @NotNull
   protected String primaryKeyColumn;
   protected transient DataType primaryKeyColumnType;
+  private transient Row lastRowInBatch;
 
   @Min(1)
   private int limit = 10;
@@ -139,16 +140,18 @@
   }
 
   /*
-   * Query input by user: Example: select * from keyspace.tablename;
+   * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
+   * Example of retrieveQuery:
+   * select * from %t where token(%p) > %s limit %l;
    */
-  public String getRetrieveQuery()
+  public String getQuery()
   {
-    return retrieveQuery;
+    return query;
   }
 
-  public void setRetrieveQuery(String retrieveQuery)
+  public void setQuery(String query)
   {
-    this.retrieveQuery = retrieveQuery;
+    this.query = query.replace("%t", tablename);
   }
 
   /*
@@ -224,6 +227,17 @@
       }
 
       primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
+       if(query.contains("%p"))
+       {
+          query = query.replace("%p", primaryKeyColumn);
+       }
+       if(query.contains("%l"))
+       {
+         query = query.replace("%l", limit+"");
+       }
+
+
+       logger.debug("query is {}",query);
 
       for (int i = 0; i < numberOfColumns; i++) {
         // Get the designated column's data type.
@@ -290,6 +304,7 @@
   @SuppressWarnings("unchecked")
   public Object getTuple(Row row)
   {
+    lastRowInBatch = row;
     Object obj = null;
     final int size = columnDataTypes.size();
 
@@ -366,76 +381,53 @@
     return obj;
   }
 
+  /*
+   * This method replaces the parameters in Query with actual values given by user.
+   * Example of retrieveQuery:
+   * select * from %t where token(%p) > %v limit %l;
+   */
   @Override
   public String queryToRetrieveData()
   {
-    boolean flag = false;
-
-    switch (primaryKeyColumnType.getName()) {
-      case INT:
-        if (startRow.intValue() > lastRowIdInBatch.intValue()) {
-          flag = true;
-        }
-        break;
-      case COUNTER:
-        if (startRow.longValue() > lastRowIdInBatch.longValue()) {
-          flag = true;
-        }
-        break;
-      case FLOAT:
-        if (startRow.floatValue() > lastRowIdInBatch.floatValue()) {
-          flag = true;
-        }
-        break;
-      case DOUBLE:
-        if (startRow.doubleValue() > lastRowIdInBatch.doubleValue()) {
-          flag = true;
-        }
-        break;
+    String parameterizedQuery;
+    if(query.contains("%v"))
+    {
+      parameterizedQuery = query.replace("%v", startRow+"");
     }
-
-    if (flag) {
-      return "";
+    else
+    {
+      parameterizedQuery = query;
     }
-
-    startRow = lastRowIdInBatch.intValue() + 1;
-    StringBuilder sb = new StringBuilder();
-    sb.append(retrieveQuery).append(" where ").append("token(").append(primaryKeyColumn).append(")").append(">=").append(startRow).append(" LIMIT ").append(limit);
-    logger.debug("retrievequery is {}", sb.toString());
-
-    return sb.toString();
+    return parameterizedQuery;
   }
 
+
   /*
-   * Overriding processResult to save primarykey column value from last row in batch.
+   * Overriding emitTupes to save primarykey column value from last row in batch.
    */
   @Override
-  protected void processResult(ResultSet result)
+  public void emitTuples()
   {
-      Row lastRowInBatch = null;
-        for (Row row: result) {
-          Object tuple = getTuple(row);
-          outputPort.emit(tuple);
-          lastRowInBatch = row;
-        }
-           if (lastRowInBatch != null) {
-          switch (primaryKeyColumnType.getName()) {
-            case INT:
-              lastRowIdInBatch = lastRowInBatch.getInt(0);
-              break;
-            case COUNTER:
-              lastRowIdInBatch = lastRowInBatch.getLong(0);
-              break;
-            case FLOAT:
-              lastRowIdInBatch = lastRowInBatch.getFloat(0);
-              break;
-            case DOUBLE:
-              lastRowIdInBatch = lastRowInBatch.getDouble(0);
-              break;
-            default:
-              throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
-          }
-        }
+    super.emitTuples();
+    if (lastRowInBatch != null) {
+      switch (primaryKeyColumnType.getName()) {
+        case INT:
+          startRow = lastRowInBatch.getInt(primaryKeyColumn);
+          break;
+        case COUNTER:
+          startRow = lastRowInBatch.getLong(primaryKeyColumn);
+          break;
+        case FLOAT:
+          startRow = lastRowInBatch.getFloat(primaryKeyColumn);
+          break;
+        case DOUBLE:
+          startRow = lastRowInBatch.getDouble(primaryKeyColumn);
+          break;
+        default:
+          throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
+      }
+    }
+
   }
 
   private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index a6d0d21..0658d19 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -292,7 +292,7 @@
   @Test
   public void TestCassandraInputOperator()
   {
-    String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT;
+    String query1 = "SELECT * FROM " + KEYSPACE + "." + "%t;";
     CassandraStore store = new CassandraStore();
     store.setNode(NODE);
     store.setKeyspace(KEYSPACE);
@@ -305,7 +305,7 @@
     inputOperator.setStore(store);
     inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
     inputOperator.setTablename(TABLE_NAME_INPUT);
-    inputOperator.setRetrieveQuery(retrieveQuery);
+    inputOperator.setQuery(query1);
     ArrayList<String> columns = new ArrayList<String>();
     columns.add("id");
     columns.add("age");
@@ -324,14 +324,9 @@
     inputOperator.outputPort.setSink(sink);
     inputOperator.setup(context);
     inputOperator.beginWindow(0);
-    inputOperator.insertEventsInTable(10);
     inputOperator.emitTuples();
     inputOperator.endWindow();
-    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
-    inputOperator.beginWindow(1);
-    inputOperator.emitTuples();
-    inputOperator.endWindow();
-    Assert.assertEquals("rows from db", 20, sink.collectedTuples.size());
+    Assert.assertEquals("rows from db", 30, sink.collectedTuples.size());
     ArrayList<Integer> listOfIDs = inputOperator.getIds();
     // Rows are not stored in the same order in cassandra table in which they are inserted.
     for (int i = 0; i < 10; i++) {
@@ -341,6 +336,25 @@
       Assert.assertEquals("age set in testpojo", inputOperator.getAge().get(object.getId()).intValue(), object.getAge());
     }
 
+    sink.clear();
+    String query2 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v;";
+    inputOperator.setQuery(query2);
+    inputOperator.setStartRow(10);
+    inputOperator.beginWindow(1);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 14, sink.collectedTuples.size());
+
+    sink.clear();
+    String query3 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v LIMIT %l;";
+    inputOperator.setQuery(query3);
+    inputOperator.setStartRow(1);
+    inputOperator.setLimit(10);
+    inputOperator.beginWindow(2);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+
   }
 
   public static class TestPojo