Updates for doc and added a wait period when no data is available in store
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
index 7e67398..4c65ed6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraInputOperator.java
@@ -17,16 +17,17 @@
 package com.datatorrent.contrib.cassandra;
 
 
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.ResultSet;
-import com.datastax.driver.core.Row;
+import com.datatorrent.lib.db.AbstractStoreInputOperator;
 
 import com.datatorrent.api.DefaultOutputPort;
 
 import com.datatorrent.common.util.DTThrowable;
-import com.datatorrent.lib.db.AbstractStoreInputOperator;
 
 /**
  * Base input adapter which reads data from persistence database through DATASTAX API and writes into output port(s). 
@@ -44,6 +45,32 @@
 
   private static final Logger logger = LoggerFactory.getLogger(AbstractCassandraInputOperator.class);
 
+  int waitForDataTimeout = 100;
+
+  /**
+   * Get the amount of time to wait for data in milliseconds.
+   * When there is no data available this timeout is used to throttle requests to the store so as to not continuously
+   * slam it with requests. This is specified in milliseconds.
+   *
+   * @return The wait timeout in milliseconds
+   */
+  public int getWaitForDataTimeout()
+  {
+    return waitForDataTimeout;
+  }
+
+  /**
+   * Set the amount of time to wait for data in milliseconds.
+   * When there is no data available this timeout is used to throttle requests to the store so as to not continuously
+   * slam it with requests. This is specified in milliseconds.
+   *
+   * @param waitForDataTimeout The wait timeout in milliseconds
+   */
+  public void setWaitForDataTimeout(int waitForDataTimeout)
+  {
+    this.waitForDataTimeout = waitForDataTimeout;
+  }
+
   /**
    * Any concrete class has to override this method to convert a Database row into Tuple.
    *
@@ -77,9 +104,14 @@
 
     try {
       ResultSet result = store.getSession().execute(query);
-      for(Row row: result) {
-        T tuple = getTuple(row);
-        outputPort.emit(tuple);
+      if (!result.isExhausted()) {
+        for (Row row : result) {
+          T tuple = getTuple(row);
+          outputPort.emit(tuple);
+        }
+      } else {
+        // No rows available wait for some time before retrying so as to not continuously slam the database
+        Thread.sleep(waitForDataTimeout);
       }
     }
     catch (Exception ex) {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 9938e6f..a2b229e 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -15,29 +15,33 @@
  */
 package com.datatorrent.contrib.cassandra;
 
+import java.math.BigDecimal;
+import java.util.*;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
 import com.datastax.driver.core.ColumnDefinitions;
 import com.datastax.driver.core.DataType;
 import com.datastax.driver.core.Row;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.util.PojoUtils;
-import com.datatorrent.lib.util.PojoUtils.Setter;
-import com.datatorrent.lib.util.PojoUtils.SetterBoolean;
-import com.datatorrent.lib.util.PojoUtils.SetterDouble;
-import com.datatorrent.lib.util.PojoUtils.SetterFloat;
-import com.datatorrent.lib.util.PojoUtils.SetterInt;
-import com.datatorrent.lib.util.PojoUtils.SetterLong;
-import java.math.BigDecimal;
-import java.util.*;
-import javax.validation.constraints.Min;
-import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.*;
+
+import com.datatorrent.api.Context.OperatorContext;
+
 /**
  * <p>
  * CassandraPOJOInputOperator</p>
- * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
- * User can give a parameterized query with parameters %t for table name, %p for primary key, %s for start value and %l for limit.
+ * A generic implementation of AbstractCassandraInputOperator that fetches rows of data from Cassandra and emits them as POJOs.
+ * Each row is converted to a POJO by mapping the columns in the row to fields of the POJO based on a user specified mapping.
+ * User should also provide a query to fetch the rows from database. This query is run continuously to fetch new data and
+ * hence should be parameterized. The parameters that can be used are %t for table name, %p for primary key, %s for start value
+ * and %l for limit. The start value is continuously updated with the value of a primary key column of the last row from
+ * the result of the previous run of the query. The primary key column is also identified by the user using a property.
  *
  * @displayName Cassandra POJO Input Operator
  * @category Input
@@ -204,7 +208,6 @@
 
       com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename + " LIMIT " + 1);
       ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
-      int numberOfColumns = rsMetaData.size();
 
       primaryKeyColumnType = rsMetaData.getType(primaryKeyColumn);
        if(query.contains("%p"))
@@ -220,10 +223,7 @@
 
       //In case columns is a subset
       int columnSize = columns.size();
-      if (columns.size()<numberOfColumns){
-        numberOfColumns = columnSize;
-      }
-      for (int i = 0; i < numberOfColumns; i++) {
+      for (int i = 0; i < columnSize; i++) {
         // Get the designated column's data type.
         DataType type = rsMetaData.getType(columns.get(i));
         columnDataTypes.add(type);