changes.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index eb48846..7e67398 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -34,7 +34,6 @@
* <p>
* This is an abstract class. Sub-classes need to implement {@link #queryToRetrieveData()} and {@link #getTuple(Row)}.
* </p>
- *
* @param <T>
* @displayName Abstract Cassandra Input
* @category Store
@@ -75,27 +74,17 @@
{
String query = queryToRetrieveData();
logger.debug(String.format("select statement: %s", query));
- if (!"".equals(query)) {
- try {
- ResultSet result = store.getSession().execute(query);
- processResult(result);
- }
- catch (Exception ex) {
- store.disconnect();
- DTThrowable.rethrow(ex);
+
+ try {
+ ResultSet result = store.getSession().execute(query);
+ for(Row row: result) {
+ T tuple = getTuple(row);
+ outputPort.emit(tuple);
}
}
- }
-
- /*
- * ProcessResult can be overridden in classes providing implementation of AbstractCassandraInputOperator.
- * Providing a basic implementation here which iterates through the rows in result,gets the particular tuple from row and emits it.
- */
- protected void processResult(ResultSet result)
- {
- for (Row row: result) {
- T tuple = getTuple(row);
- outputPort.emit(tuple);
+ catch (Exception ex) {
+ store.disconnect();
+ DTThrowable.rethrow(ex);
}
}
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index b1caa66..edd84a6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -17,7 +17,6 @@
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
-import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.util.PojoUtils;
@@ -36,8 +35,9 @@
/**
* <p>
- * CassandraInputOperator</p>
+ * CassandraPOJOInputOperator</p>
* A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
+ * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
*
* @displayName Cassandra POJO Input Operator
* @category Input
@@ -54,13 +54,14 @@
private String tablename;
private final transient List<Object> setters;
@NotNull
- private String retrieveQuery;
+ private String query;
+
private transient Class<?> objectClass = null;
private boolean useAllColumns;
- protected Number lastRowIdInBatch = 0;
@NotNull
protected String primaryKeyColumn;
protected transient DataType primaryKeyColumnType;
+ private transient Row lastRowInBatch;
@Min(1)
private int limit = 10;
@@ -139,16 +140,18 @@
}
/*
- * Query input by user: Example: select * from keyspace.tablename;
+ * Parameterized query with parameters such as %t for table name , %p for primary key, %s for start value and %l for limit.
+ * Example of retrieveQuery:
+ * select * from %t where token(%p) > %s limit %l;
*/
- public String getRetrieveQuery()
+ public String getQuery()
{
- return retrieveQuery;
+ return query;
}
- public void setRetrieveQuery(String retrieveQuery)
+ public void setQuery(String query)
{
- this.retrieveQuery = retrieveQuery;
+ this.query = query.replace("%t", tablename);
}
/*
@@ -224,6 +227,17 @@
}
primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
+ if(query.contains("%p"))
+ {
+ query = query.replace("%p", primaryKeyColumn);
+ }
+ if(query.contains("%l"))
+ {
+ query = query.replace("%l", limit+"");
+ }
+
+
+ logger.debug("query is {}",query);
for (int i = 0; i < numberOfColumns; i++) {
// Get the designated column's data type.
@@ -290,6 +304,7 @@
@SuppressWarnings("unchecked")
public Object getTuple(Row row)
{
+ lastRowInBatch = row;
Object obj = null;
final int size = columnDataTypes.size();
@@ -366,76 +381,53 @@
return obj;
}
+ /*
+ * This method replaces the parameters in Query with actual values given by user.
+ * Example of retrieveQuery:
+ * select * from %t where token(%p) > %v limit %l;
+ */
@Override
public String queryToRetrieveData()
{
- boolean flag = false;
-
- switch (primaryKeyColumnType.getName()) {
- case INT:
- if (startRow.intValue() > lastRowIdInBatch.intValue()) {
- flag = true;
- }
- break;
- case COUNTER:
- if (startRow.longValue() > lastRowIdInBatch.longValue()) {
- flag = true;
- }
- break;
- case FLOAT:
- if (startRow.floatValue() > lastRowIdInBatch.floatValue()) {
- flag = true;
- }
- break;
- case DOUBLE:
- if (startRow.doubleValue() > lastRowIdInBatch.doubleValue()) {
- flag = true;
- }
- break;
+ String parameterizedQuery;
+ if(query.contains("%v"))
+ {
+ parameterizedQuery = query.replace("%v", startRow+"");
}
-
- if (flag) {
- return "";
+ else
+ {
+ parameterizedQuery = query;
}
-
- startRow = lastRowIdInBatch.intValue() + 1;
- StringBuilder sb = new StringBuilder();
- sb.append(retrieveQuery).append(" where ").append("token(").append(primaryKeyColumn).append(")").append(">=").append(startRow).append(" LIMIT ").append(limit);
- logger.debug("retrievequery is {}", sb.toString());
-
- return sb.toString();
+ return parameterizedQuery;
}
+
/*
- * Overriding processResult to save primarykey column value from last row in batch.
+ * Overriding emitTupes to save primarykey column value from last row in batch.
*/
@Override
- protected void processResult(ResultSet result)
+ public void emitTuples()
{
- Row lastRowInBatch = null;
- for (Row row: result) {
- Object tuple = getTuple(row);
- outputPort.emit(tuple);
- lastRowInBatch = row;
- }
- if (lastRowInBatch != null) {
- switch (primaryKeyColumnType.getName()) {
- case INT:
- lastRowIdInBatch = lastRowInBatch.getInt(0);
- break;
- case COUNTER:
- lastRowIdInBatch = lastRowInBatch.getLong(0);
- break;
- case FLOAT:
- lastRowIdInBatch = lastRowInBatch.getFloat(0);
- break;
- case DOUBLE:
- lastRowIdInBatch = lastRowInBatch.getDouble(0);
- break;
- default:
- throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
- }
- }
+ super.emitTuples();
+ if (lastRowInBatch != null) {
+ switch (primaryKeyColumnType.getName()) {
+ case INT:
+ startRow = lastRowInBatch.getInt(primaryKeyColumn);
+ break;
+ case COUNTER:
+ startRow = lastRowInBatch.getLong(primaryKeyColumn);
+ break;
+ case FLOAT:
+ startRow = lastRowInBatch.getFloat(primaryKeyColumn);
+ break;
+ case DOUBLE:
+ startRow = lastRowInBatch.getDouble(primaryKeyColumn);
+ break;
+ default:
+ throw new RuntimeException("unsupported data type " + primaryKeyColumnType.getName());
+ }
+ }
+
}
private static final Logger logger = LoggerFactory.getLogger(CassandraPOJOInputOperator.class);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index a6d0d21..0658d19 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -292,7 +292,7 @@
@Test
public void TestCassandraInputOperator()
{
- String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT;
+ String query1 = "SELECT * FROM " + KEYSPACE + "." + "%t;";
CassandraStore store = new CassandraStore();
store.setNode(NODE);
store.setKeyspace(KEYSPACE);
@@ -305,7 +305,7 @@
inputOperator.setStore(store);
inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
inputOperator.setTablename(TABLE_NAME_INPUT);
- inputOperator.setRetrieveQuery(retrieveQuery);
+ inputOperator.setQuery(query1);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
@@ -324,14 +324,9 @@
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.beginWindow(0);
- inputOperator.insertEventsInTable(10);
inputOperator.emitTuples();
inputOperator.endWindow();
- Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
- inputOperator.beginWindow(1);
- inputOperator.emitTuples();
- inputOperator.endWindow();
- Assert.assertEquals("rows from db", 20, sink.collectedTuples.size());
+ Assert.assertEquals("rows from db", 30, sink.collectedTuples.size());
ArrayList<Integer> listOfIDs = inputOperator.getIds();
// Rows are not stored in the same order in cassandra table in which they are inserted.
for (int i = 0; i < 10; i++) {
@@ -341,6 +336,25 @@
Assert.assertEquals("age set in testpojo", inputOperator.getAge().get(object.getId()).intValue(), object.getAge());
}
+ sink.clear();
+ String query2 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v;";
+ inputOperator.setQuery(query2);
+ inputOperator.setStartRow(10);
+ inputOperator.beginWindow(1);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+ Assert.assertEquals("rows from db", 14, sink.collectedTuples.size());
+
+ sink.clear();
+ String query3 = "SELECT * FROM " + KEYSPACE + "." + "%t where token(%p) > %v LIMIT %l;";
+ inputOperator.setQuery(query3);
+ inputOperator.setStartRow(1);
+ inputOperator.setLimit(10);
+ inputOperator.beginWindow(2);
+ inputOperator.emitTuples();
+ inputOperator.endWindow();
+ Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
+
}
public static class TestPojo