changes for memsql.
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
index f7abaaa..255234d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
@@ -43,7 +43,7 @@
           counter++) {
         statement.executeUpdate("insert into " +
                                 FQ_TABLE +
-                                " (" + DATA_COLUMN + ") values (" + random.nextInt() + ")");
+                                " (" + DATA_COLUMN1 + ") values (" + random.nextInt() + ")");
       }
 
       statement.close();
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
index 689c8f8..d47f8e3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
@@ -27,6 +27,7 @@
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Random;
 import org.junit.Assert;
 import org.junit.Test;
@@ -38,14 +39,15 @@
   private static transient final Logger LOG = LoggerFactory.getLogger(AbstractMemsqlOutputOperatorTest.class);
 
   public static final String HOST_PREFIX = "jdbc:mysql://";
-  public static final String HOST = "127.0.0.1";
+  public static final String HOST = "localhost";
   public static final String USER = "root";
-  public static final String PORT = "3306";
+  public static final String PORT = "3307";
   public static final String DATABASE = "bench";
   public static final String TABLE = "bench";
   public static final String FQ_TABLE = DATABASE + "." + TABLE;
   public static final String INDEX_COLUMN = "data_index";
-  public static final String DATA_COLUMN = "data";
+  public static final String DATA_COLUMN1 = "data1";
+  public static final String DATA_COLUMN2 = "data2";
   public static final int NUM_WINDOWS = 10;
   public static final int BLAST_SIZE = 10;
   public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE;
@@ -97,8 +99,10 @@
                             FQ_TABLE +
                             "(" + INDEX_COLUMN +
                             " INTEGER AUTO_INCREMENT PRIMARY KEY, " +
-                            DATA_COLUMN +
-                            " INTEGER)");
+                            DATA_COLUMN1+
+                            " INTEGER," +
+                            DATA_COLUMN2+
+                            " VARCHAR)");
     String createMetaTable = "CREATE TABLE IF NOT EXISTS " + DATABASE + "." + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " +
                              JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " +
                              JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " +
@@ -128,7 +132,15 @@
     MemsqlOutputOperator outputOperator = new MemsqlOutputOperator();
     outputOperator.setStore(memsqlStore);
     outputOperator.setBatchSize(BATCH_SIZE);
-
+    outputOperator.setTablename(FQ_TABLE);
+    ArrayList<String> columns = new ArrayList<String>();
+    columns.add(DATA_COLUMN1);
+    columns.add(DATA_COLUMN2);
+    outputOperator.setDataColumns(columns);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("innerObj.getIntVal()");
+    expressions.add("innerObj.getStringVal()");
+    outputOperator.setExpression(expressions);
     AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
     attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
     attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
@@ -169,4 +181,66 @@
                         DATABASE_SIZE,
                         databaseSize);
   }
+
+  public InnerObj innerObj = new InnerObj();
+
+  /**
+   * @return the innerObj
+   */
+  public InnerObj getInnerObj()
+  {
+    return innerObj;
+  }
+
+  /**
+   * @param innerObj the innerObj to set
+   */
+  public void setInnerObj(InnerObj innerObj)
+  {
+    this.innerObj = innerObj;
+  }
+
+  public class InnerObj
+  {
+    public InnerObj()
+    {
+    }
+
+
+    private int intVal = 11;
+    private String stringVal = "hello";
+
+    /**
+     * @return the intVal
+     */
+    public int getIntVal()
+    {
+      return intVal;
+    }
+
+    /**
+     * @param intVal the intVal to set
+     */
+    public void setIntVal(int intVal)
+    {
+      this.intVal = intVal;
+    }
+
+
+    /**
+     * @return the stringVal
+     */
+    public String getStringVal()
+    {
+      return stringVal;
+    }
+
+    /**
+     * @param stringVal the stringVal to set
+     */
+    public void setStringVal(String stringVal)
+    {
+      this.stringVal = stringVal;
+    }
+  }
 }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index 2465fec..4673561 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -13,33 +13,237 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package com.datatorrent.contrib.memsql;
 
 import com.datatorrent.api.Context.OperatorContext;
-import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.FQ_TABLE;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterObject;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import com.datatorrent.lib.util.PojoUtils.GetterString;
 import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class MemsqlOutputOperator extends AbstractMemsqlOutputOperator<Integer>
+/*
+ * A generic implementation of AbstractMemsqlOutputOperator which can take in a POJO.
+ */
+public class MemsqlOutputOperator extends AbstractMemsqlOutputOperator<Object>
 {
-  private static final String INSERT = "INSERT INTO " +
-                                        FQ_TABLE +
-                                        " (" + AbstractMemsqlOutputOperatorTest.DATA_COLUMN + ")" +
-                                        " values (?)";
+  @NotNull
+  private String tablename;
+  @NotNull
+  //Columns in memsql database set by user.
+  private ArrayList<String> dataColumns;
+  //Expressions set by user to get field values from input tuple.
+  private ArrayList<String> expression;
+  private ArrayList<Integer> columnDataTypes;
+  //check for non transient
+  private boolean isFirstTuple = true;
+  private ArrayList<Object> getters;
+
+  /*
+   * An ArrayList of Java expressions that will yield the field value from the POJO.
+   * Each expression corresponds to one column in the memsql table.
+   * Example:
+   */
+  public ArrayList<String> getExpression()
+  {
+    return expression;
+  }
+
+  /*
+   * Set Java Expression.
+   * @param ArrayList of Extraction Expressions
+   */
+  public void setExpression(ArrayList<String> expression)
+  {
+    this.expression = expression;
+  }
+
+  private String insertStatement;
+
+
+  /*
+   * An arraylist of data column names to be set in Memsql database.
+   * Gets column names.
+   */
+  public ArrayList<String> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  /*
+   * An arraylist of data column names to be set in Memsql database.
+   * Sets column names.
+   */
+  public void setDataColumns(ArrayList<String> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+
+
+  /*
+   * Gets the Memsql Tablename
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  /*
+   * Sets the Memsql Tablename
+   */
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    StringBuilder columns = new StringBuilder("");
+    StringBuilder values = new StringBuilder("");
+    for (int i = 0; i < dataColumns.size(); i++) {
+      columns.append(dataColumns.get(i));
+      values.append("?");
+      if (i < dataColumns.size() - 1) {
+        columns.append(",");
+        values.append(",");
+      }
+    }
+    insertStatement = "INSERT INTO "
+            + tablename
+            + " (" + dataColumns + ")"
+            + " values (" + values + ")";
+    super.setup(context);
+    Connection conn = store.getConnection();
+    LOG.debug("Got Connection.");
+    try {
+      Statement st = conn.createStatement();
+      ResultSet rs = st.executeQuery("select * from" + tablename);
+
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+
+      int numberOfColumns = 0;
+
+      numberOfColumns = rsMetaData.getColumnCount();
+
+      LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+      for (int i = 1; i <= numberOfColumns; i++) {
+        // get the designated column's SQL type.
+        int type = rsMetaData.getColumnType(i);
+        columnDataTypes.add(type);
+        LOG.debug("sql column type is " + type);
+      }
+    }
+    catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+
+  }
 
   public MemsqlOutputOperator()
   {
   }
 
   @Override
-  protected String getUpdateCommand()
+  public void processTuple(Object tuple)
   {
-    return INSERT;
+    if (isFirstTuple) {
+      processFirstTuple(tuple);
+    }
+    isFirstTuple = false;
+    super.processTuple(tuple);
+  }
+
+  public void processFirstTuple(Object tuple)
+  {
+    Class<?> fqcn = tuple.getClass();
+    int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      int type = columnDataTypes.get(i);
+      String getterExpression = expression.get(i);
+      if (type == Types.CHAR) {
+        GetterChar getChar = PojoUtils.createGetterChar(fqcn, getterExpression);
+        getters.add(getChar.get(tuple));
+      }
+      else if (type == Types.VARCHAR) {
+        GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
+        getters.add(getVarchar.get(tuple));
+      }
+      else if (type == Types.BOOLEAN || type == Types.TINYINT) {
+        GetterBoolean getBoolean = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+        getters.add(getBoolean.get(tuple));
+      }
+      else if (type == Types.SMALLINT) {
+        GetterShort getShort = PojoUtils.createGetterShort(fqcn, getterExpression);
+        getters.add(getShort.get(tuple));
+      }
+      else if (type == Types.INTEGER) {
+        GetterInt getInt = PojoUtils.createGetterInt(fqcn, getterExpression);
+        getters.add(getInt.get(tuple));
+      }
+      else if (type == Types.BIGINT) {
+        GetterLong getLong = PojoUtils.createExpressionGetterLong(fqcn, getterExpression);
+        getters.add(getLong.get(tuple));
+      }
+      else if (type == Types.DECIMAL) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add((Number)getObject.get(tuple));
+      }
+      else if (type == Types.FLOAT) {
+        GetterFloat getFloat = PojoUtils.createGetterFloat(fqcn, getterExpression);
+        getters.add(getFloat.get(tuple));
+      }
+      else if (type == Types.DOUBLE) {
+        GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
+        getters.add(getDouble.get(tuple));
+      }
+      else if (type == Types.DATE) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add((Date)getObject.get(tuple));
+      }
+      else if (type == Types.TIME) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add((Time)getObject.get(tuple));
+      }
+      else if (type == Types.ARRAY) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add((Array)getObject.get(tuple));
+      }
+      else if (type == Types.OTHER) {
+        GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+        getters.add(getObject.get(tuple));
+      }
+
+    }
+
   }
 
   @Override
-  protected void setStatementParameters(PreparedStatement statement, Integer tuple) throws SQLException
+  protected String getUpdateCommand()
   {
-    statement.setInt(1, tuple);
+    return insertStatement;
   }
+
+  @Override
+  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+  {
+    int size = dataColumns.size();
+    for (int i = 0; i < size; i++) {
+      statement.setObject(i + 1, getters.get(i));
+    }
+  }
+
+  private static transient final Logger LOG = LoggerFactory.getLogger(MemsqlOutputOperator.class);
+
 }