changes for cassandra.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
index 25da032..e30e7c6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -1,5 +1,4 @@
-/*
- * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+opyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -28,9 +27,7 @@
 import com.datatorrent.lib.util.PojoUtils.GetterObject;
 import com.datatorrent.lib.util.PojoUtils.GetterString;
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.UUID;
+import java.util.*;
 import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +63,9 @@
     this.expressions = expressions;
   }
 
+  /*
+   * An ArrayList of Columns in the Cassandra Table.
+   */
   public ArrayList<String> getColumns()
   {
     return columns;
@@ -118,7 +118,6 @@
     int size = columnDataTypes.size();
     for (int i = 0; i < size; i++) {
       DataType type = columnDataTypes.get(i);
-      LOG.debug("type is {}",type);
       String getterExpression = PojoUtils.getSingleFieldExpression(fqcn, expressions.get(i));
       if (type.equals(DataType.ascii()) || type.equals(DataType.text()) || type.equals(DataType.varchar())) {
         GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
@@ -179,7 +178,6 @@
             + " (" + queryfields.toString() + ") "
             + "VALUES (" + values.toString() + ");";
     LOG.debug("statement is {}", statement);
-
     return store.getSession().prepare(statement);
   }
 
@@ -190,15 +188,9 @@
       processFirstTuple(tuple);
     }
     BoundStatement boundStmnt = new BoundStatement(updateCommand);
-    //BatchStatement batchStmt = new BatchStatement();
     int size = columnDataTypes.size();
-    //Object getter = new Object();
-   // Object[] getter= new Object[size];
-   // UUID id = (UUID)(((GetterObject)getters.get(0)).get(tuple));
-   // Object id = null;
     for (int i = 0; i < size; i++) {
       DataType type = columnDataTypes.get(i);
-      LOG.debug("name of type is {}",type.getName());
        switch (type.getName()) {
         case UUID:
          UUID id = (UUID)(((GetterObject)getters.get(i)).get(tuple));
@@ -237,7 +229,6 @@
           boundStmnt.setFloat(i, floatValue);
           break;
         case DOUBLE:
-          LOG.debug("double value");
           Double doubleValue = ((GetterDouble)getters.get(i)).get(tuple);
           boundStmnt.setDouble(i, doubleValue);
           break;
@@ -245,12 +236,24 @@
           BigDecimal decimal = (BigDecimal)((GetterObject)getters.get(i)).get(tuple);
           boundStmnt.setDecimal(i, decimal);
           break;
+        case SET:
+          Set set = (Set)((GetterObject)getters.get(i)).get(tuple);
+          boundStmnt.setSet(i, set);
+          break;
+        case MAP:
+          Map map = (Map)((GetterObject)getters.get(i)).get(tuple);
+          boundStmnt.setMap(i, map);
+          break;
+        case LIST:
+          List list = (List)((GetterObject)getters.get(i)).get(tuple);
+          boundStmnt.setList(i, list);
+          break;
+        case TIMESTAMP:
+          Date date = (Date)((GetterObject)getters.get(i)).get(tuple);
+          boundStmnt.setDate(i, date);
+          break;
       }
     }
-    //batchStmt.add(updateCommand.bind(id, getter, title1, body1));
-//batch.add(ps2.bind(uid, mid2));
-          //  boundStmnt.bind(id,getter1[0],getter1[1]);
-
 
     return boundStmnt;
   }
@@ -258,4 +261,3 @@
   private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
 }
 
-
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 25d389d..77bcddb 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -29,6 +29,8 @@
 import org.junit.Test;
 
 import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator}
@@ -42,7 +44,7 @@
   private static String APP_ID = "CassandraOperatorTest";
   private static int OPERATOR_ID = 0;
   private static Cluster cluster = null;
-  private static Session session=null;
+  private static Session session = null;
 
   private static class TestEvent
   {
@@ -52,14 +54,15 @@
     {
       this.id = id;
     }
+
   }
 
   @BeforeClass
   public static void setup()
   {
     try {
-       cluster = Cluster.builder()
-          .addContactPoint(NODE).build();
+      cluster = Cluster.builder()
+              .addContactPoint(NODE).build();
       session = cluster.connect(KEYSPACE);
 
       String createMetaTable = "CREATE TABLE IF NOT EXISTS " + CassandraTransactionalStore.DEFAULT_META_TABLE + " ( "
@@ -69,7 +72,7 @@
               + "PRIMARY KEY (" + CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") "
               + ");";
       session.execute(createMetaTable);
-      String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>);";
+      String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
       session.execute(createTable);
     }
     catch (Throwable e) {
@@ -80,92 +83,103 @@
   @AfterClass
   public static void cleanup()
   {
-   if(session!=null)
-   {
-     session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
-     session.execute("DROP TABLE " +  KEYSPACE + "." + TABLE_NAME);
-     session.close();
-   }
-    if(cluster!=null)
-    {
+    if (session != null) {
+      session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
+      session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
+      session.close();
+    }
+    if (cluster != null) {
       cluster.close();
     }
   }
 
-  private static void cleanTable()
-  {
-    try {
-      Cluster cluster = Cluster.builder()
-          .addContactPoint(NODE).build();
-      Session session = cluster.connect(KEYSPACE);
-
-      String cleanTable = "TRUNCATE " + TABLE_NAME + ";";
-      session.execute(cleanTable);
-    }
-    catch (DriverException e) {
-      throw new RuntimeException(e);
-    }
-  }
 
   private static class TestOutputOperator extends CassandraOutputOperator
   {
-
-    TestOutputOperator()
-    {
-      //cleanTable();
-    }
-
     public long getNumOfEventsInStore()
     {
-
-      try {
-        Cluster cluster = Cluster.builder()
-            .addContactPoint(NODE).build();
-        Session session = cluster.connect(KEYSPACE);
-
         String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
         ResultSet resultSetCount = session.execute(countQuery);
-        for(Row row: resultSetCount)
-        {
+        for (Row row: resultSetCount) {
           return row.getLong(0);
         }
         return 0;
-      }
-      catch (DriverException e) {
-        throw new RuntimeException("fetching count", e);
-      }
+
     }
 
-    public String getEventsInStore()
+    public void getEventsInStore()
     {
-      try {
-        Cluster cluster = Cluster.builder()
-            .addContactPoint(NODE).build();
-        Session session = cluster.connect(KEYSPACE);
-
         String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
         ResultSet resultSetRecords = session.execute(recordsQuery);
-         for(Row row: resultSetRecords)
-        {
-          System.out.println("result is "+ row.getBool("test") + "," + row.getString("lastname"));
+        int count =0;
+        for (Row row: resultSetRecords) {
+          LOG.debug("Boolean value is {}", row.getBool("test"));
+          Assert.assertEquals(true, row.getBool("test"));
+          LOG.debug("lastname returned is {}", row.getString("lastname"));
+          Assert.assertEquals("abclast", row.getString("lastname"));
+          LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+          Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
+          LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+          LOG.debug("age returned is {}", row.getInt("age"));
+          LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+          LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+          LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+          LOG.debug("date returned is {}", row.getDate("last_visited"));
+          Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
+          if(count == 0)
+          {
+            Assert.assertEquals(2, row.getInt("age"));
+            Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(2);
+            list.add(2);
+            map.put("key2", 2);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          if(count == 1)
+          {
+            Assert.assertEquals(1, row.getInt("age"));
+            Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(1);
+            list.add(1);
+            map.put("key1", 1);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          if(count == 2)
+          {
+            Assert.assertEquals(0, row.getInt("age"));
+            Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
+            Set<Integer> set = new HashSet<Integer>();
+            List<Integer> list = new ArrayList<Integer>();
+            Map<String,Integer> map = new HashMap<String, Integer>();
+            set.add(0);
+            list.add(0);
+            map.put("key0", 0);
+            Assert.assertEquals(set, row.getSet("set1", Integer.class));
+            Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+            Assert.assertEquals(list,row.getList("list1", Integer.class));
+          }
+          count++;
         }
-        return null;
-      }
-      catch (DriverException e) {
-        throw new RuntimeException("fetching records", e);
-      }
+
+
     }
+
   }
 
   private static class TestInputOperator extends AbstractCassandraInputOperator<TestEvent>
   {
 
-    private static final String retrieveQuery = "SELECT * FROM " +KEYSPACE +"."+TABLE_NAME + ";";
-
-    TestInputOperator()
-    {
-      cleanTable();
-    }
+    private static final String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
 
     @Override
     public TestEvent getTuple(Row row)
@@ -188,10 +202,10 @@
     {
       try {
         Cluster cluster = Cluster.builder()
-            .addContactPoint(NODE).build();
+                .addContactPoint(NODE).build();
         Session session = cluster.connect(KEYSPACE);
 
-        String insert = "INSERT INTO " + TABLE_NAME +" (ID)"+ " VALUES (?);";
+        String insert = "INSERT INTO " + TABLE_NAME + " (ID)" + " VALUES (?);";
         PreparedStatement stmt = session.prepare(insert);
         BoundStatement boundStatement = new BoundStatement(stmt);
         Statement statement;
@@ -204,8 +218,8 @@
         throw new RuntimeException(e);
       }
     }
-  }
 
+  }
 
   @Test
   public void testCassandraOutputOperator()
@@ -224,50 +238,40 @@
     ArrayList<String> columns = new ArrayList<String>();
     columns.add("id");
     columns.add("age");
-   // columns.add("lastname");
-  //  columns.add("test");
-  //  columns.add("floatValue");
     columns.add("doubleValue");
     columns.add("floatValue");
-     columns.add("lastname");
-   // columns.add("date");
-   // columns.add("set1");
+    columns.add("last_visited");
+    columns.add("lastname");
     columns.add("list1");
     columns.add("map1");
     columns.add("set1");
     columns.add("test");
-    // columns.add("age");
     outputOperator.setColumns(columns);
     ArrayList<String> expressions = new ArrayList<String>();
     expressions.add("id");
     expressions.add("age");
- //   expressions.add("lastname");
- //   expressions.add("test");
-  //  expressions.add("floatValue");
     expressions.add("doubleValue");
     expressions.add("floatValue");
+    expressions.add("last_visited");
     expressions.add("lastname");
-    //expressions.add("date");
-    //expressions.add("set1");
     expressions.add("list1");
     expressions.add("map1");
     expressions.add("set1");
-   expressions.add("test");
-    // expressions.add("age");
+    expressions.add("test");
     outputOperator.setExpressions(expressions);
     outputOperator.setStore(transactionalStore);
 
     outputOperator.setup(context);
 
     List<TestPojo> events = Lists.newArrayList();
-    for (int i = 0; i < 10; i++) {
-      Set<Integer> set  = new HashSet<Integer>();
+    for (int i = 0; i < 3; i++) {
+      Set<Integer> set = new HashSet<Integer>();
       set.add(i);
       List<Integer> list = new ArrayList<Integer>();
       list.add(i);
-      Map<String,Integer> map = new HashMap<String, Integer>();
-      map.put("key"+i, i);
-      events.add(new TestPojo(UUID.randomUUID(), i, "abclast" + i,true,i,2.0,set,list,map));
+      Map<String, Integer> map = new HashMap<String, Integer>();
+      map.put("key" + i, i);
+      events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis())));
     }
 
     outputOperator.beginWindow(0);
@@ -276,17 +280,11 @@
     }
     outputOperator.endWindow();
 
-    Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
+    Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
     outputOperator.getEventsInStore();
-    for (int i = 0; i < 10; i++) {
-
-      //StringBuilder firstlastname = new StringBuilder("abc"+i+","+"abclast"+i);
-      //Assert.assertEquals(firstlastname.toString(),outputOperator.getEventsInStore());
-     }
-
   }
 
- //@Test
+  //@Test
   public void TestCassandraInputOperator()
   {
     CassandraStore store = new CassandraStore();
@@ -314,9 +312,7 @@
 
   public static class TestPojo
   {
-    private int age = 2;
-
-    private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1)
+    private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
     {
       this.id = randomUUID;
       this.age = i;
@@ -324,10 +320,10 @@
       this.test = b;
       this.floatValue = d;
       this.doubleValue = d0;
-     // this.date = date;
       this.set1 = set1;
       this.list1 = list1;
       this.map1 = map1;
+      this.last_visited = date;
     }
 
     public int getAge()
@@ -350,22 +346,22 @@
       this.test = test;
     }
 
-    public Set getSet1()
+    public Set<Integer> getSet1()
     {
       return set1;
     }
 
-    public void setSet1(Set set)
+    public void setSet1(Set<Integer> set)
     {
       this.set1 = set;
     }
 
-    public List getList1()
+    public List<Integer> getList1()
     {
       return list1;
     }
 
-    public void setList1(List list)
+    public void setList1(List<Integer> list)
     {
       this.list1 = list;
     }
@@ -380,16 +376,6 @@
       this.map1 = map;
     }
 
-    public Date getDate()
-    {
-      return date;
-    }
-
-    public void setDate(Date date)
-    {
-      this.date = date;
-    }
-
     public Double getDoubleValue()
     {
       return doubleValue;
@@ -409,15 +395,27 @@
     {
       this.floatValue = floatValue;
     }
+
     private String lastname = "hello";
     private UUID id;
     private boolean test;
-    private Set set1;
-    private List list1;
-    private Map<String,Integer> map1;
-    private Date date;
+    private Set<Integer> set1;
+    private List<Integer> list1;
+    private Map<String, Integer> map1;
     private Double doubleValue;
-    private Float  floatValue;
+    private Float floatValue;
+    private Date last_visited;
+    private int age = 2;
+
+    public Date getLast_visited()
+    {
+      return last_visited;
+    }
+
+    public void setLast_visited(Date last_visited)
+    {
+      this.last_visited = last_visited;
+    }
 
     public UUID getId()
     {
@@ -439,10 +437,9 @@
       this.lastname = lastname;
     }
 
-
-
   }
 
-}
+  private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
 
+}