changes
diff --git a/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java
new file mode 100644
index 0000000..edf78b4
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java
@@ -0,0 +1,16 @@
+
+package com.datatorrent.contrib.accumulo;
+
+import org.apache.accumulo.core.data.Mutation;
+
+
+public class AccumuloOutputOperator extends AbstractAccumuloOutputOperator<Object>
+{
+
+  @Override
+  public Mutation operationMutation(Object t)
+  {
+    return null;
+  }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
index 4c4e3c5..9bb5d38 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
@@ -64,6 +64,7 @@
   @Override
   public void processBatch(Collection<T> tuples)
   {
+    System.out.println("in process Batch");
     BatchStatement batchCommand = store.getBatchCommand();
     for(T tuple: tuples)
     {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
index b9a3f69..05162a1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -46,7 +46,7 @@
   private ArrayList<DataType> columnDataTypes;
   private ArrayList<String> expressions;
   private transient ArrayList<Object> getters;
-
+  private boolean isfirstTuple;
   /*
    * An ArrayList of Java expressions that will yield the field value from the POJO.
    * Each expression corresponds to one column in the Cassandra table.
@@ -91,6 +91,7 @@
   public CassandraOutputOperator()
   {
     super();
+    isfirstTuple = true;
     columnDataTypes = new ArrayList<DataType>();
     getters = new ArrayList<Object>();
   }
@@ -99,9 +100,10 @@
   @Override
   public void processTuple(Object tuple)
   {
-    if (getters.isEmpty()) {
+    if (isfirstTuple) {
       processFirstTuple(tuple);
     }
+    isfirstTuple = false;
     super.processTuple(tuple);
   }
 
@@ -109,8 +111,10 @@
   {
     Class<?> fqcn = tuple.getClass();
     int size = columnDataTypes.size();
+    LOG.debug("size is {}",size);
     for (int i = 0; i < size; i++) {
       DataType type = columnDataTypes.get(i);
+      LOG.debug("type is {}",type.getClass());
       String getterExpression = expressions.get(i);
       if (type.equals(DataType.Name.ASCII) || type.equals(DataType.Name.TEXT) || type.equals(DataType.Name.VARCHAR)) {
         GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
@@ -144,6 +148,10 @@
         GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
         getters.add(getObject);
       }
+      else
+      {
+        throw new UnsupportedOperationException("this operation is not supported"+type);
+      }
 
     }
 
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index e75a0ab..aec3e71 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -191,13 +191,13 @@
 
     outputOperator.setup(context);
 
-    List<TestEvent> events = Lists.newArrayList();
+    List<InnerObj> events = Lists.newArrayList();
     for (int i = 0; i < 10; i++) {
-      events.add(new TestEvent(i));
+      events.add(new InnerObj(i));
     }
 
     outputOperator.beginWindow(0);
-    for (TestEvent event : events) {
+    for (InnerObj event : events) {
       outputOperator.input.process(event);
     }
     outputOperator.endWindow();
@@ -205,7 +205,7 @@
     Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
   }
 
-  @Test
+  //@Test
   public void TestCassandraInputOperator()
   {
     CassandraStore store = new CassandraStore();
@@ -257,6 +257,11 @@
 
     private int ID=11;
 
+    private InnerObj(int i)
+    {
+      ID = i;
+    }
+
     /**
      * @return the int ID
      */
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
index ce05bbe..923a719 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
@@ -41,6 +41,7 @@
   @Override
   public void processTuple(T tuple)
   {
+    System.out.println("in process tuple");
     tuples.add(tuple);
   }
 
@@ -54,6 +55,7 @@
   /**
    * Processes the whole batch at the end window and writes to the store.
    *
+   * @param tuples
    */
   public abstract void processBatch(Collection<T> tuples);
 
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
index c3cf5b7..48ff88e 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
@@ -56,6 +56,9 @@
     @Override
     public void process(T t)
     {
+      System.out.println("in process");
+       System.out.println("committedWindowId is " + committedWindowId);
+       System.out.println("currentWindowId is " + currentWindowId);
       if (committedWindowId < currentWindowId) {
         processTuple(t);
       }
@@ -91,6 +94,7 @@
       appId = context.getValue(DAG.APPLICATION_ID);
       operatorId = context.getId();
       committedWindowId = store.getCommittedWindowId(appId, operatorId);
+      System.out.println("committedWindowId is "+ committedWindowId);
     }
     catch (IOException ex) {
       throw new RuntimeException(ex);