Merge pull request #1414 from pm48/JdbcPojoImplementation

Jdbc POJO Output Operator
diff --git a/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
new file mode 100644
index 0000000..4e1ed7b
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/db/jdbc/JdbcPOJOOutputOperator.java
@@ -0,0 +1,269 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.lib.db.jdbc;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.Getter;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * JdbcPOJOOutputOperator class.</p>
+ * A Generic implementation of AbstractJdbcTransactionableOutputOperator which takes in any POJO.
+ *
+ * @displayName Jdbc Output Operator
+ * @category Output
+ * @tags output operator,transactional, POJO
+ * @since 2.1.0
+ */
+public class JdbcPOJOOutputOperator extends AbstractJdbcTransactionableOutputOperator<Object>
+{
+  @NotNull
+  private ArrayList<String> dataColumns;
+  //These are extracted from table metadata
+  private ArrayList<Integer> columnDataTypes;
+
+  /*
+   * An arraylist of data column names to be set in database.
+   * Gets column names.
+   */
+  public ArrayList<String> getDataColumns()
+  {
+    return dataColumns;
+  }
+
+  public void setDataColumns(ArrayList<String> dataColumns)
+  {
+    this.dataColumns = dataColumns;
+  }
+
+  @NotNull
+  private String tablename;
+
+  /*
+   * Gets the Tablename in database.
+   */
+  public String getTablename()
+  {
+    return tablename;
+  }
+
+  public void setTablename(String tablename)
+  {
+    this.tablename = tablename;
+  }
+
+  /*
+   * An ArrayList of Java expressions that will yield the field value from the POJO.
+   * Each expression corresponds to one column in the database table.
+   */
+  public ArrayList<String> getExpressions()
+  {
+    return expressions;
+  }
+
+  public void setExpressions(ArrayList<String> expressions)
+  {
+    this.expressions = expressions;
+  }
+
+  @NotNull
+  private ArrayList<String> expressions;
+  private transient ArrayList<Object> getters;
+  private String insertStatement;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    StringBuilder columns = new StringBuilder("");
+    StringBuilder values = new StringBuilder("");
+    for (int i = 0; i < dataColumns.size(); i++) {
+      columns.append(dataColumns.get(i));
+      values.append("?");
+      if (i < dataColumns.size() - 1) {
+        columns.append(",");
+        values.append(",");
+      }
+    }
+    insertStatement = "INSERT INTO "
+            + tablename
+            + " (" + columns.toString() + ")"
+            + " VALUES (" + values.toString() + ")";
+    LOG.debug("insert statement is {}", insertStatement);
+    super.setup(context);
+    Connection conn = store.getConnection();
+    LOG.debug("Got Connection.");
+    try {
+      Statement st = conn.createStatement();
+      ResultSet rs = st.executeQuery("select * from " + tablename);
+
+      ResultSetMetaData rsMetaData = rs.getMetaData();
+
+      int numberOfColumns = 0;
+
+      numberOfColumns = rsMetaData.getColumnCount();
+
+      LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+      for (int i = 1; i <= numberOfColumns; i++) {
+        // get the designated column's SQL type.
+        int type = rsMetaData.getColumnType(i);
+        LOG.debug("column name {}", rsMetaData.getColumnTypeName(i));
+        columnDataTypes.add(type);
+        LOG.debug("sql column type is " + type);
+      }
+    }
+    catch (SQLException ex) {
+      throw new RuntimeException(ex);
+    }
+
+  }
+
+  public JdbcPOJOOutputOperator()
+  {
+    super();
+    columnDataTypes = new ArrayList<Integer>();
+    getters = new ArrayList<Object>();
+  }
+
+  @Override
+  public void processTuple(Object tuple)
+  {
+    if (getters.isEmpty()) {
+      processFirstTuple(tuple);
+    }
+    super.processTuple(tuple);
+  }
+
+  public void processFirstTuple(Object tuple)
+  {
+    final Class<?> fqcn = tuple.getClass();
+    final int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      final int type = columnDataTypes.get(i);
+      final String getterExpression = expressions.get(i);
+      final Object getter;
+      switch (type) {
+        case Types.CHAR:
+          getter = PojoUtils.createGetterChar(fqcn, getterExpression);
+          break;
+        case Types.VARCHAR:
+          getter = PojoUtils.createGetter(fqcn, getterExpression, String.class);
+          break;
+        case Types.BOOLEAN:
+        case Types.TINYINT:
+          getter = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+          break;
+        case Types.SMALLINT:
+          getter = PojoUtils.createGetterShort(fqcn, getterExpression);
+          break;
+        case Types.INTEGER:
+          getter = PojoUtils.createGetterInt(fqcn, getterExpression);
+          break;
+        case Types.BIGINT:
+          getter = PojoUtils.createGetterLong(fqcn, getterExpression);
+          break;
+        case Types.FLOAT:
+          getter = PojoUtils.createGetterFloat(fqcn, getterExpression);
+          break;
+        case Types.DOUBLE:
+          getter = PojoUtils.createGetterDouble(fqcn, getterExpression);
+          break;
+        default:
+          /*
+           Types.DECIMAL
+           Types.DATE
+           Types.TIME
+           Types.ARRAY
+           Types.OTHER
+           */
+          getter = PojoUtils.createGetter(fqcn, getterExpression, Object.class);
+          break;
+      }
+      getters.add(getter);
+    }
+
+  }
+
+  @Override
+  protected String getUpdateCommand()
+  {
+    LOG.debug("insertstatement is {}", insertStatement);
+    return insertStatement;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+  {
+    final int size = columnDataTypes.size();
+    for (int i = 0; i < size; i++) {
+      final int type = columnDataTypes.get(i);
+      switch (type) {
+        case (Types.CHAR):
+          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+          break;
+        case (Types.VARCHAR):
+          statement.setString(i + 1, ((Getter<Object, String>)getters.get(i)).get(tuple));
+          break;
+        case (Types.BOOLEAN):
+        case (Types.TINYINT):
+          statement.setBoolean(i + 1, ((GetterBoolean<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.SMALLINT):
+          statement.setShort(i + 1, ((GetterShort<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.INTEGER):
+          statement.setInt(i + 1, ((GetterInt<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.BIGINT):
+          statement.setLong(i + 1, ((GetterLong<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.FLOAT):
+          statement.setFloat(i + 1, ((GetterFloat<Object>)getters.get(i)).get(tuple));
+          break;
+        case (Types.DOUBLE):
+          statement.setDouble(i + 1, ((GetterDouble<Object>)getters.get(i)).get(tuple));
+          break;
+        default:
+          /*
+           Types.DECIMAL
+           Types.DATE
+           Types.TIME
+           Types.ARRAY
+           Types.OTHER
+           */
+          statement.setObject(i + 1, ((Getter<Object, Object>)getters.get(i)).get(tuple));
+          break;
+      }
+    }
+  }
+
+  private static final Logger LOG = LoggerFactory.getLogger(JdbcPOJOOutputOperator.class);
+
+}
diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
index 3e3bbff..851c59a 100644
--- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
+++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcOperatorTest.java
@@ -16,6 +16,7 @@
 import com.datatorrent.common.util.DTThrowable;
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 import com.datatorrent.lib.testbench.CollectorTestSink;
+import java.util.ArrayList;
 
 /**
  * Tests for {@link AbstractJdbcTransactionableOutputOperator} and {@link AbstractJdbcInputOperator}
@@ -26,6 +27,7 @@
   public static final String URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
 
   private static final String TABLE_NAME = "test_event_table";
+  private static final String TABLE_POJO_NAME = "test_pojo_event_table";
   private static String APP_ID = "JdbcOperatorTest";
   private static int OPERATOR_ID = 0;
 
@@ -39,6 +41,43 @@
     }
   }
 
+  public static class TestPOJOEvent
+  {
+    private int id;
+    private String name;
+
+    public TestPOJOEvent()
+    {
+    }
+
+    public TestPOJOEvent(int id, String name)
+    {
+      this.id = id;
+      this.name = name;
+    }
+
+    public int getId()
+    {
+      return id;
+    }
+
+    public void setId(int id)
+    {
+      this.id = id;
+    }
+
+    public String getName()
+    {
+      return name;
+    }
+
+    public void setName(String name)
+    {
+      this.name = name;
+    }
+
+  }
+
   @BeforeClass
   public static void setup()
   {
@@ -48,16 +87,18 @@
       Connection con = DriverManager.getConnection(URL);
       Statement stmt = con.createStatement();
 
-      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " +
-        JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " +
-        JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " +
-        JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, " +
-        "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") " +
-        ")";
+      String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+              + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+              + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+              + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+              + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+              + ")";
       stmt.executeUpdate(createMetaTable);
 
       String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INTEGER)";
       stmt.executeUpdate(createTable);
+      String createPOJOTable = "CREATE TABLE IF NOT EXISTS " + TABLE_POJO_NAME + "(id INTEGER not NULL,name VARCHAR(255), PRIMARY KEY ( id ))";
+      stmt.executeUpdate(createPOJOTable);
     }
     catch (Throwable e) {
       DTThrowable.rethrow(e);
@@ -121,6 +162,32 @@
     }
   }
 
+  private static class TestPOJOOutputOperator extends JdbcPOJOOutputOperator
+  {
+    TestPOJOOutputOperator()
+    {
+      cleanTable();
+    }
+
+    public int getNumOfEventsInStore()
+    {
+      Connection con;
+      try {
+        con = DriverManager.getConnection(URL);
+        Statement stmt = con.createStatement();
+
+        String countQuery = "SELECT count(*) from " + TABLE_POJO_NAME;
+        ResultSet resultSet = stmt.executeQuery(countQuery);
+        resultSet.next();
+        return resultSet.getInt(1);
+      }
+      catch (SQLException e) {
+        throw new RuntimeException("fetching count", e);
+      }
+    }
+
+  }
+
   private static class TestInputOperator extends AbstractJdbcInputOperator<TestEvent>
   {
 
@@ -195,6 +262,47 @@
     outputOperator.endWindow();
 
     Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
+    cleanTable();
+  }
+
+  @Test
+  public void testJdbcPOJOOutputOperator()
+  {
+    JdbcTransactionalStore transactionalStore = new JdbcTransactionalStore();
+    transactionalStore.setDatabaseDriver(DB_DRIVER);
+    transactionalStore.setDatabaseUrl(URL);
+
+    com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap attributeMap = new com.datatorrent.api.Attribute.AttributeMap.DefaultAttributeMap();
+    attributeMap.put(DAG.APPLICATION_ID, APP_ID);
+    OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
+
+    TestPOJOOutputOperator outputOperator = new TestPOJOOutputOperator();
+    outputOperator.setBatchSize(3);
+    outputOperator.setTablename(TABLE_POJO_NAME);
+    ArrayList<String> dataColumns = new ArrayList<String>();
+    dataColumns.add("id");
+    dataColumns.add("name");
+    outputOperator.setDataColumns(dataColumns);
+    outputOperator.setStore(transactionalStore);
+    ArrayList<String> expressions = new ArrayList<String>();
+    expressions.add("getId()");
+    expressions.add("getName()");
+    outputOperator.setExpressions(expressions);
+
+    outputOperator.setup(context);
+
+    List<TestPOJOEvent> events = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      events.add(new TestPOJOEvent(i, "test" + i));
+    }
+
+    outputOperator.beginWindow(0);
+    for (TestPOJOEvent event: events) {
+      outputOperator.input.process(event);
+    }
+    outputOperator.endWindow();
+
+    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
   }
 
   @Test