changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
index 7a869bb..c120abe 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
@@ -27,6 +27,7 @@
 import java.math.BigDecimal;
 import java.sql.*;
 import java.util.ArrayList;
+import java.util.List;
 import javax.validation.constraints.Min;
 import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
@@ -36,40 +37,44 @@
  * <p>
  * MemsqlPOJOInputOperator</p>
  * A Generic implementation of AbstractMemsqlInputOperator which gets field values from memsql database columns and sets in a POJO.
- *
+ * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
  * @displayName Memsql POJO Input Operator
  * @category Input
  * @tags input operator
  */
 public class MemsqlPOJOInputOperator extends AbstractMemsqlInputOperator<Object>
 {
-  private final transient ArrayList<Integer> columnDataTypes;
+  private final transient List<Integer> columnDataTypes;
   @Min(1)
   private int limit = 10;
   @Min(1)
-  private int startRow = 1;
+  private Number startRow = 1;
   @NotNull
-  private ArrayList<String> expressions;
+  private List<String> expressions;
   @NotNull
   private String tablename;
-  private transient ArrayList<Object> setters;
-  @NotNull
-  private String retrieveQuery;
+  private transient List<Object> setters;
   private transient Class<?> objectClass = null;
   @NotNull
   private String primaryKeyColumn;
-  private ArrayList<String> columns;
+  @NotNull
+  private List<String> columns;
   private boolean useAllColumns;
+  private transient Number lastRowKey = 1;
+  private transient int primaryKeyColumnType;
+  @NotNull
+  private String query;
 
   /*
-   * Subset of columns specified by User in case POJO needs to contain fields specific to these columns only.
+   * Set of columns specified by User in case POJO needs to contain fields specific to these columns only.
+   * User should specify columns in same order as expressions for fields.
    */
-  public ArrayList<String> getColumns()
+  public List<String> getColumns()
   {
     return columns;
   }
 
-  public void setColumns(ArrayList<String> columns)
+  public void setColumns(List<String> columns)
   {
     this.columns = columns;
   }
@@ -91,12 +96,12 @@
   /*
    * User has the option to specify the start row.
    */
-  public int getStartRow()
+  public Number getStartRow()
   {
     return startRow;
   }
 
-  public void setStartRow(int startRow)
+  public void setStartRow(Number startRow)
   {
     this.startRow = startRow;
   }
@@ -150,28 +155,30 @@
   }
 
   /*
-   * Query input by user: Example: select * from keyspace.tablename;
+   * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
+   * Example of retrieveQuery:
+   * select * from %t where %p > %s and %p < %e;
    */
-  public String getRetrieveQuery()
+  public String getQuery()
   {
-    return retrieveQuery;
+    return query;
   }
 
-  public void setRetrieveQuery(String retrieveQuery)
+  public void setQuery(String query)
   {
-    this.retrieveQuery = retrieveQuery;
+    this.query = query.replace("%t", tablename);
   }
 
   /*
    * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
    * Each expression corresponds to one column in the Cassandra table.
    */
-  public ArrayList<String> getExpressions()
+  public List<String> getExpressions()
   {
     return expressions;
   }
 
-  public void setExpressions(ArrayList<String> expressions)
+  public void setExpressions(List<String> expressions)
   {
     this.expressions = expressions;
   }
@@ -198,17 +205,14 @@
     ResultSetMetaData rsMetaData;
     try {
       Statement statement = store.getConnection().createStatement();
-      resultSet = statement.executeQuery("select count(*) from " + tablename);
-      resultSet.next();
       resultSet = statement.executeQuery("describe " + tablename);
       rsMetaData = resultSet.getMetaData();
-
-      if (!isUseAllColumns()) {
-        numberOfColumns = rsMetaData.getColumnCount();
+      primaryKeyColumnType = rsMetaData.getColumnType(resultSet.findColumn(primaryKeyColumn));
+      if(query.contains("%p"))
+      {
+          query = query.replace("%p", primaryKeyColumn);
       }
-      else {
-        numberOfColumns = columns.size();
-      }
+      numberOfColumns = rsMetaData.getColumnCount();
 
       logger.debug("column Count=" + numberOfColumns);
       statement.close();
@@ -229,12 +233,7 @@
     for (int i = 1; i <= numberOfColumns; i++) {
       int type = 0;
       try {
-        if (!isUseAllColumns()) {
-          type = rsMetaData.getColumnType(i);
-        }
-        else {
-          type = rsMetaData.getColumnType(resultSet.findColumn(columns.get(i)));
-        }
+        type = rsMetaData.getColumnType(i);
         columnDataTypes.add(type);
       }
       catch (SQLException ex) {
@@ -311,79 +310,44 @@
     }
     final int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
+      String columnName = columns.get(i);
       final int type = columnDataTypes.get(i);
       try {
         switch (type) {
           case (Types.CHAR):
           case (Types.VARCHAR):
-            final String ascii;
-
-            ascii = result.getString(i + 1);
-
-            ((Setter<Object, String>)setters.get(i)).set(obj, ascii);
+            ((Setter<Object, String>)setters.get(i)).set(obj, result.getString(columnName));
             break;
           case (Types.BOOLEAN):
           case (Types.TINYINT):
-            final boolean boolValue;
-            boolValue = result.getBoolean(i);
-
-            ((SetterBoolean)setters.get(i)).set(obj, boolValue);
+            ((SetterBoolean)setters.get(i)).set(obj, result.getBoolean(columnName));
             break;
           case (Types.SMALLINT):
-            final short shortValue;
-
-            shortValue = result.getShort(i);
-
-            ((SetterShort)setters.get(i)).set(obj, shortValue);
+            ((SetterShort)setters.get(i)).set(obj, result.getShort(columnName));
             break;
           case (Types.INTEGER):
-            final int intValue;
-
-            intValue = result.getInt(i + 1);
-
-            ((SetterInt)setters.get(i)).set(obj, intValue);
+            ((SetterInt)setters.get(i)).set(obj, result.getInt(columnName));
             break;
           case (Types.BIGINT):
-            final long longValue;
-            longValue = result.getLong(i);
-
-            ((SetterLong)setters.get(i)).set(obj, longValue);
+            ((SetterLong)setters.get(i)).set(obj, result.getLong(columnName));
             break;
           case (Types.FLOAT):
-            final float floatValue;
-
-            floatValue = result.getFloat(i);
-            ((SetterFloat)setters.get(i)).set(obj, floatValue);
+            ((SetterFloat)setters.get(i)).set(obj, result.getFloat(columnName));
             break;
           case (Types.DOUBLE):
-            final double doubleValue;
-            doubleValue = result.getDouble(i);
-
-            ((SetterDouble)setters.get(i)).set(obj, doubleValue);
+            ((SetterDouble)setters.get(i)).set(obj, result.getDouble(columnName));
             break;
           case (Types.DECIMAL):
-            final BigDecimal decimal;
-            decimal = result.getBigDecimal(i);
-
-            ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, decimal);
+            ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, result.getBigDecimal(columnName));
             break;
           case (Types.DATE):
-            final Date dateValue;
-            dateValue = result.getDate(i);
-
-            ((Setter<Object, Date>)setters.get(i)).set(obj, dateValue);
+            ((Setter<Object, Date>)setters.get(i)).set(obj, result.getDate(columnName));
             break;
           case (Types.TIMESTAMP):
-            final Timestamp timeValue;
-            timeValue = result.getTimestamp(i);
-
-            ((Setter<Object, Timestamp>)setters.get(i)).set(obj, timeValue);
+            ((Setter<Object, Timestamp>)setters.get(i)).set(obj, result.getTimestamp(columnName));
             break;
           case (Types.OTHER):
-            final Object objValue;
-            objValue = result.getObject(i);
-
-            ((Setter<Object, Object>)setters.get(i)).set(obj, objValue);
+            ((Setter<Object, Object>)setters.get(i)).set(obj, result.getObject(columnName));
             break;
           default:
             throw new RuntimeException("unsupported data type " + type);
@@ -393,37 +357,71 @@
         throw new RuntimeException(ex);
       }
     }
-    return obj;
-  }
-
-  @Override
-  public String queryToRetrieveData()
-  {
     try {
-      ResultSet rset = store.getConnection().createStatement().executeQuery("SELECT * FROM " + tablename + " ORDER BY " + primaryKeyColumn + " DESC LIMIT 1");
-      if (startRow > rset.getInt(0)) {
-        return null;
+      if (result.last()) {
+        switch (primaryKeyColumnType) {
+          case Types.INTEGER:
+            lastRowKey = result.getInt(primaryKeyColumn);
+            break;
+          case Types.BIGINT:
+            lastRowKey = result.getLong(primaryKeyColumn);
+            break;
+          case Types.FLOAT:
+            lastRowKey = result.getFloat(primaryKeyColumn);
+            break;
+          case Types.DOUBLE:
+            lastRowKey = result.getDouble(primaryKeyColumn);
+            break;
+          case Types.SMALLINT:
+            lastRowKey = result.getShort(primaryKeyColumn);
+            break;
+          default:
+            throw new RuntimeException("unsupported data type " + primaryKeyColumnType);
+        }
       }
     }
+
     catch (SQLException ex) {
       throw new RuntimeException(ex);
     }
+    return obj;
+  }
 
-    int endRow = startRow + limit;
+  /*
+   * This method replaces the parameters in Query with actual values given by user.
+   * Example of retrieveQuery:
+   * select * from %t where %p > %s and %p < %e;
+   */
+  @Override
+  public String queryToRetrieveData()
+  {
+    String parameterizedQuery;
+    int endRow = startRow.intValue() + limit;
 
-    StringBuilder sb = new StringBuilder();
-    sb.append(retrieveQuery);
-    sb.append(" where ");
-    sb.append(primaryKeyColumn);
-    sb.append(" >= ");
-    sb.append(startRow);
-    sb.append(" and ");
-    sb.append(primaryKeyColumn);
-    sb.append(" < ");
-    sb.append(endRow);
-    startRow = endRow;
+    if(query.contains("%s"))
+    {
+      parameterizedQuery = query.replace("%v", startRow+"");
+    }
+    else if(query.contains("%e"))
+    {
+      parameterizedQuery = query.replace("%e", endRow+"");
+    }
+    else
+    {
+      parameterizedQuery = query;
+    }
+    return parameterizedQuery;
+  }
 
-    return sb.toString();
+
+  /*
+   * Overriding emitTupes to save primarykey column value from last row in batch.
+   */
+  @Override
+  public void emitTuples()
+  {
+    super.emitTuples();
+    startRow = lastRowKey;
   }
 
   private static final Logger logger = LoggerFactory.getLogger(MemsqlPOJOInputOperator.class);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
index ed43690..36ef69e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
@@ -180,8 +180,7 @@
     expressions.add("id");
     expressions.add("name");
     inputOperator.setExpressions(expressions);
-    inputOperator.setRetrieveQuery("select * from " + FQ_TABLE);
-
+    inputOperator.setQuery("select * from " + FQ_TABLE);
     inputOperator.setOutputClass("com.datatorrent.contrib.memsql.TestInputPojo");
     CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
     inputOperator.outputPort.setSink(sink);
@@ -192,12 +191,32 @@
     inputOperator.emitTuples();
     inputOperator.endWindow();
 
-    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+    Assert.assertEquals("rows from db", 30, sink.collectedTuples.size());
     for (int i = 0; i < 10; i++) {
       TestInputPojo object = (TestInputPojo)sink.collectedTuples.get(i);
       Assert.assertEquals("id set in testpojo", i + 1, object.getId());
       Assert.assertEquals("name set in testpojo", "Testname" + i, object.getName());
     }
+    sink.clear();
+    inputOperator.setQuery("select * from " + FQ_TABLE + " where " + "%p" + ">=" + "%s");
+    inputOperator.setStartRow(1);
+    inputOperator.setup(null);
+
+    inputOperator.beginWindow(0);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+    sink.clear();
+    inputOperator.setQuery("select * from " + FQ_TABLE + " where " + "%p" + ">=" + "%s" + "%p" + "<" + "%e");
+    inputOperator.setStartRow(1);
+    inputOperator.setLimit(10);
+    inputOperator.setup(null);
+
+    inputOperator.beginWindow(0);
+    inputOperator.emitTuples();
+    inputOperator.endWindow();
+    Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+
   }
 
 }