changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
index 7a869bb..c120abe 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlPOJOInputOperator.java
@@ -27,6 +27,7 @@
import java.math.BigDecimal;
import java.sql.*;
import java.util.ArrayList;
+import java.util.List;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
@@ -36,40 +37,44 @@
* <p>
* MemsqlPOJOInputOperator</p>
* A Generic implementation of AbstractMemsqlInputOperator which gets field values from memsql database columns and sets in a POJO.
- *
+ * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
* @displayName Memsql POJO Input Operator
* @category Input
* @tags input operator
*/
public class MemsqlPOJOInputOperator extends AbstractMemsqlInputOperator<Object>
{
- private final transient ArrayList<Integer> columnDataTypes;
+ private final transient List<Integer> columnDataTypes;
@Min(1)
private int limit = 10;
@Min(1)
- private int startRow = 1;
+ private Number startRow = 1;
@NotNull
- private ArrayList<String> expressions;
+ private List<String> expressions;
@NotNull
private String tablename;
- private transient ArrayList<Object> setters;
- @NotNull
- private String retrieveQuery;
+ private transient List<Object> setters;
private transient Class<?> objectClass = null;
@NotNull
private String primaryKeyColumn;
- private ArrayList<String> columns;
+ @NotNull
+ private List<String> columns;
private boolean useAllColumns;
+ private transient Number lastRowKey = 1;
+ private transient int primaryKeyColumnType;
+ @NotNull
+ private String query;
/*
- * Subset of columns specified by User in case POJO needs to contain fields specific to these columns only.
+ * Set of columns specified by User in case POJO needs to contain fields specific to these columns only.
+ * User should specify columns in same order as expressions for fields.
*/
- public ArrayList<String> getColumns()
+ public List<String> getColumns()
{
return columns;
}
- public void setColumns(ArrayList<String> columns)
+ public void setColumns(List<String> columns)
{
this.columns = columns;
}
@@ -91,12 +96,12 @@
/*
* User has the option to specify the start row.
*/
- public int getStartRow()
+ public Number getStartRow()
{
return startRow;
}
- public void setStartRow(int startRow)
+ public void setStartRow(Number startRow)
{
this.startRow = startRow;
}
@@ -150,28 +155,30 @@
}
/*
- * Query input by user: Example: select * from keyspace.tablename;
+ * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
+ * Example of retrieveQuery:
+ * select * from %t where %p > %s and %p < %e;
*/
- public String getRetrieveQuery()
+ public String getQuery()
{
- return retrieveQuery;
+ return query;
}
- public void setRetrieveQuery(String retrieveQuery)
+ public void setQuery(String query)
{
- this.retrieveQuery = retrieveQuery;
+ this.query = query.replace("%t", tablename);
}
/*
* An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
* Each expression corresponds to one column in the Cassandra table.
*/
- public ArrayList<String> getExpressions()
+ public List<String> getExpressions()
{
return expressions;
}
- public void setExpressions(ArrayList<String> expressions)
+ public void setExpressions(List<String> expressions)
{
this.expressions = expressions;
}
@@ -198,17 +205,14 @@
ResultSetMetaData rsMetaData;
try {
Statement statement = store.getConnection().createStatement();
- resultSet = statement.executeQuery("select count(*) from " + tablename);
- resultSet.next();
resultSet = statement.executeQuery("describe " + tablename);
rsMetaData = resultSet.getMetaData();
-
- if (!isUseAllColumns()) {
- numberOfColumns = rsMetaData.getColumnCount();
+ primaryKeyColumnType = rsMetaData.getColumnType(resultSet.findColumn(primaryKeyColumn));
+ if(query.contains("%p"))
+ {
+ query = query.replace("%p", primaryKeyColumn);
}
- else {
- numberOfColumns = columns.size();
- }
+ numberOfColumns = rsMetaData.getColumnCount();
logger.debug("column Count=" + numberOfColumns);
statement.close();
@@ -229,12 +233,7 @@
for (int i = 1; i <= numberOfColumns; i++) {
int type = 0;
try {
- if (!isUseAllColumns()) {
- type = rsMetaData.getColumnType(i);
- }
- else {
- type = rsMetaData.getColumnType(resultSet.findColumn(columns.get(i)));
- }
+ type = rsMetaData.getColumnType(i);
columnDataTypes.add(type);
}
catch (SQLException ex) {
@@ -311,79 +310,44 @@
}
final int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
+ String columnName = columns.get(i);
final int type = columnDataTypes.get(i);
try {
switch (type) {
case (Types.CHAR):
case (Types.VARCHAR):
- final String ascii;
-
- ascii = result.getString(i + 1);
-
- ((Setter<Object, String>)setters.get(i)).set(obj, ascii);
+ ((Setter<Object, String>)setters.get(i)).set(obj, result.getString(columnName));
break;
case (Types.BOOLEAN):
case (Types.TINYINT):
- final boolean boolValue;
- boolValue = result.getBoolean(i);
-
- ((SetterBoolean)setters.get(i)).set(obj, boolValue);
+ ((SetterBoolean)setters.get(i)).set(obj, result.getBoolean(columnName));
break;
case (Types.SMALLINT):
- final short shortValue;
-
- shortValue = result.getShort(i);
-
- ((SetterShort)setters.get(i)).set(obj, shortValue);
+ ((SetterShort)setters.get(i)).set(obj, result.getShort(columnName));
break;
case (Types.INTEGER):
- final int intValue;
-
- intValue = result.getInt(i + 1);
-
- ((SetterInt)setters.get(i)).set(obj, intValue);
+ ((SetterInt)setters.get(i)).set(obj, result.getInt(columnName));
break;
case (Types.BIGINT):
- final long longValue;
- longValue = result.getLong(i);
-
- ((SetterLong)setters.get(i)).set(obj, longValue);
+ ((SetterLong)setters.get(i)).set(obj, result.getLong(columnName));
break;
case (Types.FLOAT):
- final float floatValue;
-
- floatValue = result.getFloat(i);
- ((SetterFloat)setters.get(i)).set(obj, floatValue);
+ ((SetterFloat)setters.get(i)).set(obj, result.getFloat(columnName));
break;
case (Types.DOUBLE):
- final double doubleValue;
- doubleValue = result.getDouble(i);
-
- ((SetterDouble)setters.get(i)).set(obj, doubleValue);
+ ((SetterDouble)setters.get(i)).set(obj, result.getDouble(columnName));
break;
case (Types.DECIMAL):
- final BigDecimal decimal;
- decimal = result.getBigDecimal(i);
-
- ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, decimal);
+ ((Setter<Object, BigDecimal>)setters.get(i)).set(obj, result.getBigDecimal(columnName));
break;
case (Types.DATE):
- final Date dateValue;
- dateValue = result.getDate(i);
-
- ((Setter<Object, Date>)setters.get(i)).set(obj, dateValue);
+ ((Setter<Object, Date>)setters.get(i)).set(obj, result.getDate(columnName));
break;
case (Types.TIMESTAMP):
- final Timestamp timeValue;
- timeValue = result.getTimestamp(i);
-
- ((Setter<Object, Timestamp>)setters.get(i)).set(obj, timeValue);
+ ((Setter<Object, Timestamp>)setters.get(i)).set(obj, result.getTimestamp(columnName));
break;
case (Types.OTHER):
- final Object objValue;
- objValue = result.getObject(i);
-
- ((Setter<Object, Object>)setters.get(i)).set(obj, objValue);
+ ((Setter<Object, Object>)setters.get(i)).set(obj, result.getObject(columnName));
break;
default:
throw new RuntimeException("unsupported data type " + type);
@@ -393,37 +357,71 @@
throw new RuntimeException(ex);
}
}
- return obj;
- }
-
- @Override
- public String queryToRetrieveData()
- {
try {
- ResultSet rset = store.getConnection().createStatement().executeQuery("SELECT * FROM " + tablename + " ORDER BY " + primaryKeyColumn + " DESC LIMIT 1");
- if (startRow > rset.getInt(0)) {
- return null;
+ if (result.last()) {
+ switch (primaryKeyColumnType) {
+ case Types.INTEGER:
+ lastRowKey = result.getInt(primaryKeyColumn);
+ break;
+ case Types.BIGINT:
+ lastRowKey = result.getLong(primaryKeyColumn);
+ break;
+ case Types.FLOAT:
+ lastRowKey = result.getFloat(primaryKeyColumn);
+ break;
+ case Types.DOUBLE:
+ lastRowKey = result.getDouble(primaryKeyColumn);
+ break;
+ case Types.SMALLINT:
+ lastRowKey = result.getShort(primaryKeyColumn);
+ break;
+ default:
+ throw new RuntimeException("unsupported data type " + primaryKeyColumnType);
+ }
}
}
+
catch (SQLException ex) {
throw new RuntimeException(ex);
}
+ return obj;
+ }
- int endRow = startRow + limit;
+ /*
+ * This method replaces the parameters in Query with actual values given by user.
+ * Example of retrieveQuery:
+ * select * from %t where %p > %s and %p < %e;
+ */
+ @Override
+ public String queryToRetrieveData()
+ {
+ String parameterizedQuery;
+ int endRow = startRow.intValue() + limit;
- StringBuilder sb = new StringBuilder();
- sb.append(retrieveQuery);
- sb.append(" where ");
- sb.append(primaryKeyColumn);
- sb.append(" >= ");
- sb.append(startRow);
- sb.append(" and ");
- sb.append(primaryKeyColumn);
- sb.append(" < ");
- sb.append(endRow);
- startRow = endRow;
+ if(query.contains("%s"))
+ {
+ parameterizedQuery = query.replace("%v", startRow+"");
+ }
+ else if(query.contains("%e"))
+ {
+ parameterizedQuery = query.replace("%e", endRow+"");
+ }
+ else
+ {
+ parameterizedQuery = query;
+ }
+ return parameterizedQuery;
+ }
- return sb.toString();
+
+ /*
+ * Overriding emitTupes to save primarykey column value from last row in batch.
+ */
+ @Override
+ public void emitTuples()
+ {
+ super.emitTuples();
+ startRow = lastRowKey;
}
private static final Logger logger = LoggerFactory.getLogger(MemsqlPOJOInputOperator.class);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
index ed43690..36ef69e 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/memsql/AbstractMemsqlInputOperatorTest.java
@@ -180,8 +180,7 @@
expressions.add("id");
expressions.add("name");
inputOperator.setExpressions(expressions);
- inputOperator.setRetrieveQuery("select * from " + FQ_TABLE);
-
+ inputOperator.setQuery("select * from " + FQ_TABLE);
inputOperator.setOutputClass("com.datatorrent.contrib.memsql.TestInputPojo");
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
@@ -192,12 +191,32 @@
inputOperator.emitTuples();
inputOperator.endWindow();
- Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+ Assert.assertEquals("rows from db", 30, sink.collectedTuples.size());
for (int i = 0; i < 10; i++) {
TestInputPojo object = (TestInputPojo)sink.collectedTuples.get(i);
Assert.assertEquals("id set in testpojo", i + 1, object.getId());
Assert.assertEquals("name set in testpojo", "Testname" + i, object.getName());
}
+ sink.clear();
+ inputOperator.setQuery("select * from " + FQ_TABLE + " where " + "%p" + ">=" + "%s");
+ inputOperator.setStartRow(1);
+ inputOperator.setup(null);
+
+ inputOperator.beginWindow(0);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+ Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+ sink.clear();
+ inputOperator.setQuery("select * from " + FQ_TABLE + " where " + "%p" + ">=" + "%s" + "%p" + "<" + "%e");
+ inputOperator.setStartRow(1);
+ inputOperator.setLimit(10);
+ inputOperator.setup(null);
+
+ inputOperator.beginWindow(0);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+ Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+
}
}