Updates for doc and added a wait period when no data is available in store
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 7e67398..4c65ed6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -17,16 +17,17 @@
package com.datatorrent.contrib.cassandra;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.lib.db.AbstractStoreInputOperator;
/**
* Base input adapter which reads data from persistence database through DATASTAX API and writes into output port(s).
@@ -44,6 +45,32 @@
private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
+ int waitForDataTimeout = 100;
+
+ /**
+ * Get the amount of time to wait for data in milliseconds.
+ * When there is no data available this timeout is used to throttle requests to the store so as to not continuously
+ * slam it with requests. This is specified in milliseconds.
+ *
+ * @return The wait timeout in milliseconds
+ */
+ public int getWaitForDataTimeout()
+ {
+ return waitForDataTimeout;
+ }
+
+ /**
+ * Set the amount of time to wait for data in milliseconds.
+ * When there is no data available this timeout is used to throttle requests to the store so as to not continuously
+ * slam it with requests. This is specified in milliseconds.
+ *
+ * @param waitForDataTimeout The wait timeout in milliseconds
+ */
+ public void setWaitForDataTimeout(int waitForDataTimeout)
+ {
+ this.waitForDataTimeout = waitForDataTimeout;
+ }
+
/**
* Any concrete class has to override this method to convert a Database row into Tuple.
*
@@ -77,9 +104,14 @@
try {
ResultSet result = store.getSession().execute(query);
- for(Row row: result) {
- T tuple = getTuple(row);
- outputPort.emit(tuple);
+ if (!result.isExhausted()) {
+ for (Row row : result) {
+ T tuple = getTuple(row);
+ outputPort.emit(tuple);
+ }
+ } else {
+ // No rows available wait for some time before retrying so as to not continuously slam the database
+ Thread.sleep(waitForDataTimeout);
}
}
catch (Exception ex) {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 9938e6f..a2b229e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -15,29 +15,33 @@
*/
package com.datatorrent.contrib.cassandra;
+import java.math.BigDecimal;
+import java.util.*;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
import com.datastax.driver.core.ColumnDefinitions;
import com.datastax.driver.core.DataType;
import com.datastax.driver.core.Row;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.Setter;
-import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.SetterDouble;
-import com.datatorrent.lib.util.PojoUtils.SetterFloat;
-import com.datatorrent.lib.util.PojoUtils.SetterInt;
-import com.datatorrent.lib.util.PojoUtils.SetterLong;
-import java.math.BigDecimal;
-import java.util.*;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.*;
+
+import com.datatorrent.api.Context.OperatorContext;
+
/**
* <p>
* CassandraPOJOInputOperator</p>
- * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
- * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
+ * A generic implementation of AbstractCassandraInputOperator that fetches rows of data from Cassandra and emits them as POJOs.
+ * Each row is converted to a POJO by mapping the columns in the row to fields of the POJO based on a user specified mapping.
+ * User should also provide a query to fetch the rows from database. This query is run continuously to fetch new data and
+ * hence should be parameterized. The parameters that can be used are %t for table name, %p for primary key, %s for start value
+ * and %l for limit. The start value is continuously updated with the value of a primary key column of the last row from
+ * the result of the previous run of the query. The primary key column is also identified by the user using a property.
*
* @displayName Cassandra POJO Input Operator
* @category Input
@@ -204,7 +208,6 @@
com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
- int numberOfColumns = rsMetaData.size();
primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
if(query.contains("%p"))
@@ -220,10 +223,7 @@
//In case columns is a subset
int columnSize = columns.size();
- if (columns.size()<numberOfColumns){
- numberOfColumns = columnSize;
- }
- for (int i = 0; i < numberOfColumns; i++) {
+ for (int i = 0; i < columnSize; i++) {
// Get the designated column's data type.
DataType type = rsMetaData.getType(columns.get(i));
columnDataTypes.add(type);