changes for memsql.
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
index f7abaaa..255234d 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
@@ -43,7 +43,7 @@
counter++) {
statement.executeUpdate("insert into " +
FQ_TABLE +
- " (" + DATA_COLUMN + ") values (" + random.nextInt() + ")");
+ " (" + DATA_COLUMN1 + ") values (" + random.nextInt() + ")");
}
statement.close();
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
index 689c8f8..d47f8e3 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlOutputOperatorTest.java
@@ -27,6 +27,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
+import java.util.ArrayList;
import java.util.Random;
import org.junit.Assert;
import org.junit.Test;
@@ -38,14 +39,15 @@
private static transient final Logger LOG = LoggerFactory.getLogger(AbstractMemsqlOutputOperatorTest.class);
public static final String HOST_PREFIX = "jdbc:mysql://";
- public static final String HOST = "127.0.0.1";
+ public static final String HOST = "localhost";
public static final String USER = "root";
- public static final String PORT = "3306";
+ public static final String PORT = "3307";
public static final String DATABASE = "bench";
public static final String TABLE = "bench";
public static final String FQ_TABLE = DATABASE + "." + TABLE;
public static final String INDEX_COLUMN = "data_index";
- public static final String DATA_COLUMN = "data";
+ public static final String DATA_COLUMN1 = "data1";
+ public static final String DATA_COLUMN2 = "data2";
public static final int NUM_WINDOWS = 10;
public static final int BLAST_SIZE = 10;
public static final int DATABASE_SIZE = NUM_WINDOWS * BLAST_SIZE;
@@ -97,8 +99,10 @@
FQ_TABLE +
"(" + INDEX_COLUMN +
" INTEGER AUTO_INCREMENT PRIMARY KEY, " +
- DATA_COLUMN +
- " INTEGER)");
+ DATA_COLUMN1+
+ " INTEGER," +
+ DATA_COLUMN2+
+ " VARCHAR)");
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + DATABASE + "." + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( " +
JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, " +
JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, " +
@@ -128,7 +132,15 @@
MemsqlOutputOperator outputOperator = new MemsqlOutputOperator();
outputOperator.setStore(memsqlStore);
outputOperator.setBatchSize(BATCH_SIZE);
-
+ outputOperator.setTablename(FQ_TABLE);
+ ArrayList<String> columns = new ArrayList<String>();
+ columns.add(DATA_COLUMN1);
+ columns.add(DATA_COLUMN2);
+ outputOperator.setDataColumns(columns);
+ ArrayList<String> expressions = new ArrayList<String>();
+ expressions.add("innerObj.getIntVal()");
+ expressions.add("innerObj.getStringVal()");
+ outputOperator.setExpression(expressions);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(OperatorContext.PROCESSING_MODE, ProcessingMode.AT_LEAST_ONCE);
attributeMap.put(OperatorContext.ACTIVATION_WINDOW_ID, -1L);
@@ -169,4 +181,66 @@
DATABASE_SIZE,
databaseSize);
}
+
+ public InnerObj innerObj = new InnerObj();
+
+ /**
+ * @return the innerObj
+ */
+ public InnerObj getInnerObj()
+ {
+ return innerObj;
+ }
+
+ /**
+ * @param innerObj the innerObj to set
+ */
+ public void setInnerObj(InnerObj innerObj)
+ {
+ this.innerObj = innerObj;
+ }
+
+ public class InnerObj
+ {
+ public InnerObj()
+ {
+ }
+
+
+ private int intVal = 11;
+ private String stringVal = "hello";
+
+ /**
+ * @return the intVal
+ */
+ public int getIntVal()
+ {
+ return intVal;
+ }
+
+ /**
+ * @param intVal the intVal to set
+ */
+ public void setIntVal(int intVal)
+ {
+ this.intVal = intVal;
+ }
+
+
+ /**
+ * @return the stringVal
+ */
+ public String getStringVal()
+ {
+ return stringVal;
+ }
+
+ /**
+ * @param stringVal the stringVal to set
+ */
+ public void setStringVal(String stringVal)
+ {
+ this.stringVal = stringVal;
+ }
+ }
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index 2465fec..4673561 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -13,33 +13,237 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package com.datatorrent.contrib.memsql;
import com.datatorrent.api.Context.OperatorContext;
-import static com.datatorrent.contrib.memsql.AbstractMemsqlOutputOperatorTest.FQ_TABLE;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterChar;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterObject;
+import com.datatorrent.lib.util.PojoUtils.GetterShort;
+import com.datatorrent.lib.util.PojoUtils.GetterString;
import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-public class MemsqlOutputOperator extends AbstractMemsqlOutputOperator<Integer>
+/*
+ * A generic implementation of AbstractMemsqlOutputOperator which can take in a POJO.
+ */
+public class MemsqlOutputOperator extends AbstractMemsqlOutputOperator<Object>
{
- private static final String INSERT = "INSERT INTO " +
- FQ_TABLE +
- " (" + AbstractMemsqlOutputOperatorTest.DATA_COLUMN + ")" +
- " values (?)";
+ @NotNull
+ private String tablename;
+ @NotNull
+ //Columns in memsql database set by user.
+ private ArrayList<String> dataColumns;
+ //Expressions set by user to get field values from input tuple.
+ private ArrayList<String> expression;
+ private ArrayList<Integer> columnDataTypes;
+ //check for non transient
+ private boolean isFirstTuple = true;
+ private ArrayList<Object> getters;
+
+ /*
+ * An ArrayList of Java expressions that will yield the field value from the POJO.
+ * Each expression corresponds to one column in the memsql table.
+ * Example:
+ */
+ public ArrayList<String> getExpression()
+ {
+ return expression;
+ }
+
+ /*
+ * Set Java Expression.
+ * @param ArrayList of Extraction Expressions
+ */
+ public void setExpression(ArrayList<String> expression)
+ {
+ this.expression = expression;
+ }
+
+ private String insertStatement;
+
+
+ /*
+ * An arraylist of data column names to be set in Memsql database.
+ * Gets column names.
+ */
+ public ArrayList<String> getDataColumns()
+ {
+ return dataColumns;
+ }
+
+ /*
+ * An arraylist of data column names to be set in Memsql database.
+ * Sets column names.
+ */
+ public void setDataColumns(ArrayList<String> dataColumns)
+ {
+ this.dataColumns = dataColumns;
+ }
+
+
+ /*
+ * Gets the Memsql Tablename
+ */
+ public String getTablename()
+ {
+ return tablename;
+ }
+
+ /*
+ * Sets the Memsql Tablename
+ */
+ public void setTablename(String tablename)
+ {
+ this.tablename = tablename;
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ StringBuilder columns = new StringBuilder("");
+ StringBuilder values = new StringBuilder("");
+ for (int i = 0; i < dataColumns.size(); i++) {
+ columns.append(dataColumns.get(i));
+ values.append("?");
+ if (i < dataColumns.size() - 1) {
+ columns.append(",");
+ values.append(",");
+ }
+ }
+ insertStatement = "INSERT INTO "
+ + tablename
+ + " (" + dataColumns + ")"
+ + " values (" + values + ")";
+ super.setup(context);
+ Connection conn = store.getConnection();
+ LOG.debug("Got Connection.");
+ try {
+ Statement st = conn.createStatement();
+ ResultSet rs = st.executeQuery("select * from" + tablename);
+
+ ResultSetMetaData rsMetaData = rs.getMetaData();
+
+ int numberOfColumns = 0;
+
+ numberOfColumns = rsMetaData.getColumnCount();
+
+ LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+ for (int i = 1; i <= numberOfColumns; i++) {
+ // get the designated column's SQL type.
+ int type = rsMetaData.getColumnType(i);
+ columnDataTypes.add(type);
+ LOG.debug("sql column type is " + type);
+ }
+ }
+ catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ }
public MemsqlOutputOperator()
{
}
@Override
- protected String getUpdateCommand()
+ public void processTuple(Object tuple)
{
- return INSERT;
+ if (isFirstTuple) {
+ processFirstTuple(tuple);
+ }
+ isFirstTuple = false;
+ super.processTuple(tuple);
+ }
+
+ public void processFirstTuple(Object tuple)
+ {
+ Class<?> fqcn = tuple.getClass();
+ int size = columnDataTypes.size();
+ for (int i = 0; i < size; i++) {
+ int type = columnDataTypes.get(i);
+ String getterExpression = expression.get(i);
+ if (type == Types.CHAR) {
+ GetterChar getChar = PojoUtils.createGetterChar(fqcn, getterExpression);
+ getters.add(getChar.get(tuple));
+ }
+ else if (type == Types.VARCHAR) {
+ GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
+ getters.add(getVarchar.get(tuple));
+ }
+ else if (type == Types.BOOLEAN || type == Types.TINYINT) {
+ GetterBoolean getBoolean = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+ getters.add(getBoolean.get(tuple));
+ }
+ else if (type == Types.SMALLINT) {
+ GetterShort getShort = PojoUtils.createGetterShort(fqcn, getterExpression);
+ getters.add(getShort.get(tuple));
+ }
+ else if (type == Types.INTEGER) {
+ GetterInt getInt = PojoUtils.createGetterInt(fqcn, getterExpression);
+ getters.add(getInt.get(tuple));
+ }
+ else if (type == Types.BIGINT) {
+ GetterLong getLong = PojoUtils.createExpressionGetterLong(fqcn, getterExpression);
+ getters.add(getLong.get(tuple));
+ }
+ else if (type == Types.DECIMAL) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add((Number)getObject.get(tuple));
+ }
+ else if (type == Types.FLOAT) {
+ GetterFloat getFloat = PojoUtils.createGetterFloat(fqcn, getterExpression);
+ getters.add(getFloat.get(tuple));
+ }
+ else if (type == Types.DOUBLE) {
+ GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
+ getters.add(getDouble.get(tuple));
+ }
+ else if (type == Types.DATE) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add((Date)getObject.get(tuple));
+ }
+ else if (type == Types.TIME) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add((Time)getObject.get(tuple));
+ }
+ else if (type == Types.ARRAY) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add((Array)getObject.get(tuple));
+ }
+ else if (type == Types.OTHER) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add(getObject.get(tuple));
+ }
+
+ }
+
}
@Override
- protected void setStatementParameters(PreparedStatement statement, Integer tuple) throws SQLException
+ protected String getUpdateCommand()
{
- statement.setInt(1, tuple);
+ return insertStatement;
}
+
+ @Override
+ protected void setStatementParameters(PreparedStatement statement, Object tuple) throws SQLException
+ {
+ int size = dataColumns.size();
+ for (int i = 0; i < size; i++) {
+ statement.setObject(i + 1, getters.get(i));
+ }
+ }
+
+ private static transient final Logger LOG = LoggerFactory.getLogger(MemsqlOutputOperator.class);
+
}