changes for cassandra.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
index 25da032..e30e7c6 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -1,5 +1,4 @@
-/*
- * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+opyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,9 +27,7 @@
import com.datatorrent.lib.util.PojoUtils.GetterObject;
import com.datatorrent.lib.util.PojoUtils.GetterString;
import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.UUID;
+import java.util.*;
import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,6 +63,9 @@
this.expressions = expressions;
}
+ /*
+ * An ArrayList of Columns in the Cassandra Table.
+ */
public ArrayList<String> getColumns()
{
return columns;
@@ -118,7 +118,6 @@
int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
DataType type = columnDataTypes.get(i);
- LOG.debug("type is {}",type);
String getterExpression = PojoUtils.getSingleFieldExpression(fqcn, expressions.get(i));
if (type.equals(DataType.ascii()) || type.equals(DataType.text()) || type.equals(DataType.varchar())) {
GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
@@ -179,7 +178,6 @@
+ " (" + queryfields.toString() + ") "
+ "VALUES (" + values.toString() + ");";
LOG.debug("statement is {}", statement);
-
return store.getSession().prepare(statement);
}
@@ -190,15 +188,9 @@
processFirstTuple(tuple);
}
BoundStatement boundStmnt = new BoundStatement(updateCommand);
- //BatchStatement batchStmt = new BatchStatement();
int size = columnDataTypes.size();
- //Object getter = new Object();
- // Object[] getter= new Object[size];
- // UUID id = (UUID)(((GetterObject)getters.get(0)).get(tuple));
- // Object id = null;
for (int i = 0; i < size; i++) {
DataType type = columnDataTypes.get(i);
- LOG.debug("name of type is {}",type.getName());
switch (type.getName()) {
case UUID:
UUID id = (UUID)(((GetterObject)getters.get(i)).get(tuple));
@@ -237,7 +229,6 @@
boundStmnt.setFloat(i, floatValue);
break;
case DOUBLE:
- LOG.debug("double value");
Double doubleValue = ((GetterDouble)getters.get(i)).get(tuple);
boundStmnt.setDouble(i, doubleValue);
break;
@@ -245,12 +236,24 @@
BigDecimal decimal = (BigDecimal)((GetterObject)getters.get(i)).get(tuple);
boundStmnt.setDecimal(i, decimal);
break;
+ case SET:
+ Set set = (Set)((GetterObject)getters.get(i)).get(tuple);
+ boundStmnt.setSet(i, set);
+ break;
+ case MAP:
+ Map map = (Map)((GetterObject)getters.get(i)).get(tuple);
+ boundStmnt.setMap(i, map);
+ break;
+ case LIST:
+ List list = (List)((GetterObject)getters.get(i)).get(tuple);
+ boundStmnt.setList(i, list);
+ break;
+ case TIMESTAMP:
+ Date date = (Date)((GetterObject)getters.get(i)).get(tuple);
+ boundStmnt.setDate(i, date);
+ break;
}
}
- //batchStmt.add(updateCommand.bind(id, getter, title1, body1));
-//batch.add(ps2.bind(uid, mid2));
- // boundStmnt.bind(id,getter1[0],getter1[1]);
-
return boundStmnt;
}
@@ -258,4 +261,3 @@
private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
}
-
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 25d389d..77bcddb 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -29,6 +29,8 @@
import org.junit.Test;
import org.junit.AfterClass;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator}
@@ -42,7 +44,7 @@
private static String APP_ID = "CassandraOperatorTest";
private static int OPERATOR_ID = 0;
private static Cluster cluster = null;
- private static Session session=null;
+ private static Session session = null;
private static class TestEvent
{
@@ -52,14 +54,15 @@
{
this.id = id;
}
+
}
@BeforeClass
public static void setup()
{
try {
- cluster = Cluster.builder()
- .addContactPoint(NODE).build();
+ cluster = Cluster.builder()
+ .addContactPoint(NODE).build();
session = cluster.connect(KEYSPACE);
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + CassandraTransactionalStore.DEFAULT_META_TABLE + " ( "
@@ -69,7 +72,7 @@
+ "PRIMARY KEY (" + CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") "
+ ");";
session.execute(createMetaTable);
- String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>);";
+ String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
session.execute(createTable);
}
catch (Throwable e) {
@@ -80,92 +83,103 @@
@AfterClass
public static void cleanup()
{
- if(session!=null)
- {
- session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
- session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
- session.close();
- }
- if(cluster!=null)
- {
+ if (session != null) {
+ session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
+ session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
+ session.close();
+ }
+ if (cluster != null) {
cluster.close();
}
}
- private static void cleanTable()
- {
- try {
- Cluster cluster = Cluster.builder()
- .addContactPoint(NODE).build();
- Session session = cluster.connect(KEYSPACE);
-
- String cleanTable = "TRUNCATE " + TABLE_NAME + ";";
- session.execute(cleanTable);
- }
- catch (DriverException e) {
- throw new RuntimeException(e);
- }
- }
private static class TestOutputOperator extends CassandraOutputOperator
{
-
- TestOutputOperator()
- {
- //cleanTable();
- }
-
public long getNumOfEventsInStore()
{
-
- try {
- Cluster cluster = Cluster.builder()
- .addContactPoint(NODE).build();
- Session session = cluster.connect(KEYSPACE);
-
String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
ResultSet resultSetCount = session.execute(countQuery);
- for(Row row: resultSetCount)
- {
+ for (Row row: resultSetCount) {
return row.getLong(0);
}
return 0;
- }
- catch (DriverException e) {
- throw new RuntimeException("fetching count", e);
- }
+
}
- public String getEventsInStore()
+ public void getEventsInStore()
{
- try {
- Cluster cluster = Cluster.builder()
- .addContactPoint(NODE).build();
- Session session = cluster.connect(KEYSPACE);
-
String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
ResultSet resultSetRecords = session.execute(recordsQuery);
- for(Row row: resultSetRecords)
- {
- System.out.println("result is "+ row.getBool("test") + "," + row.getString("lastname"));
+ int count =0;
+ for (Row row: resultSetRecords) {
+ LOG.debug("Boolean value is {}", row.getBool("test"));
+ Assert.assertEquals(true, row.getBool("test"));
+ LOG.debug("lastname returned is {}", row.getString("lastname"));
+ Assert.assertEquals("abclast", row.getString("lastname"));
+ LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+ Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
+ LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+ LOG.debug("age returned is {}", row.getInt("age"));
+ LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+ LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+ LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+ LOG.debug("date returned is {}", row.getDate("last_visited"));
+ Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
+ if(count == 0)
+ {
+ Assert.assertEquals(2, row.getInt("age"));
+ Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(2);
+ list.add(2);
+ map.put("key2", 2);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ if(count == 1)
+ {
+ Assert.assertEquals(1, row.getInt("age"));
+ Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(1);
+ list.add(1);
+ map.put("key1", 1);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ if(count == 2)
+ {
+ Assert.assertEquals(0, row.getInt("age"));
+ Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(0);
+ list.add(0);
+ map.put("key0", 0);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ count++;
}
- return null;
- }
- catch (DriverException e) {
- throw new RuntimeException("fetching records", e);
- }
+
+
}
+
}
private static class TestInputOperator extends AbstractCassandraInputOperator<TestEvent>
{
- private static final String retrieveQuery = "SELECT * FROM " +KEYSPACE +"."+TABLE_NAME + ";";
-
- TestInputOperator()
- {
- cleanTable();
- }
+ private static final String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
@Override
public TestEvent getTuple(Row row)
@@ -188,10 +202,10 @@
{
try {
Cluster cluster = Cluster.builder()
- .addContactPoint(NODE).build();
+ .addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
- String insert = "INSERT INTO " + TABLE_NAME +" (ID)"+ " VALUES (?);";
+ String insert = "INSERT INTO " + TABLE_NAME + " (ID)" + " VALUES (?);";
PreparedStatement stmt = session.prepare(insert);
BoundStatement boundStatement = new BoundStatement(stmt);
Statement statement;
@@ -204,8 +218,8 @@
throw new RuntimeException(e);
}
}
- }
+ }
@Test
public void testCassandraOutputOperator()
@@ -224,50 +238,40 @@
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
- // columns.add("lastname");
- // columns.add("test");
- // columns.add("floatValue");
columns.add("doubleValue");
columns.add("floatValue");
- columns.add("lastname");
- // columns.add("date");
- // columns.add("set1");
+ columns.add("last_visited");
+ columns.add("lastname");
columns.add("list1");
columns.add("map1");
columns.add("set1");
columns.add("test");
- // columns.add("age");
outputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("id");
expressions.add("age");
- // expressions.add("lastname");
- // expressions.add("test");
- // expressions.add("floatValue");
expressions.add("doubleValue");
expressions.add("floatValue");
+ expressions.add("last_visited");
expressions.add("lastname");
- //expressions.add("date");
- //expressions.add("set1");
expressions.add("list1");
expressions.add("map1");
expressions.add("set1");
- expressions.add("test");
- // expressions.add("age");
+ expressions.add("test");
outputOperator.setExpressions(expressions);
outputOperator.setStore(transactionalStore);
outputOperator.setup(context);
List<TestPojo> events = Lists.newArrayList();
- for (int i = 0; i < 10; i++) {
- Set<Integer> set = new HashSet<Integer>();
+ for (int i = 0; i < 3; i++) {
+ Set<Integer> set = new HashSet<Integer>();
set.add(i);
List<Integer> list = new ArrayList<Integer>();
list.add(i);
- Map<String,Integer> map = new HashMap<String, Integer>();
- map.put("key"+i, i);
- events.add(new TestPojo(UUID.randomUUID(), i, "abclast" + i,true,i,2.0,set,list,map));
+ Map<String, Integer> map = new HashMap<String, Integer>();
+ map.put("key" + i, i);
+ events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis())));
}
outputOperator.beginWindow(0);
@@ -276,17 +280,11 @@
}
outputOperator.endWindow();
- Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
+ Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
outputOperator.getEventsInStore();
- for (int i = 0; i < 10; i++) {
-
- //StringBuilder firstlastname = new StringBuilder("abc"+i+","+"abclast"+i);
- //Assert.assertEquals(firstlastname.toString(),outputOperator.getEventsInStore());
- }
-
}
- //@Test
+ //@Test
public void TestCassandraInputOperator()
{
CassandraStore store = new CassandraStore();
@@ -314,9 +312,7 @@
public static class TestPojo
{
- private int age = 2;
-
- private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1)
+ private TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
{
this.id = randomUUID;
this.age = i;
@@ -324,10 +320,10 @@
this.test = b;
this.floatValue = d;
this.doubleValue = d0;
- // this.date = date;
this.set1 = set1;
this.list1 = list1;
this.map1 = map1;
+ this.last_visited = date;
}
public int getAge()
@@ -350,22 +346,22 @@
this.test = test;
}
- public Set getSet1()
+ public Set<Integer> getSet1()
{
return set1;
}
- public void setSet1(Set set)
+ public void setSet1(Set<Integer> set)
{
this.set1 = set;
}
- public List getList1()
+ public List<Integer> getList1()
{
return list1;
}
- public void setList1(List list)
+ public void setList1(List<Integer> list)
{
this.list1 = list;
}
@@ -380,16 +376,6 @@
this.map1 = map;
}
- public Date getDate()
- {
- return date;
- }
-
- public void setDate(Date date)
- {
- this.date = date;
- }
-
public Double getDoubleValue()
{
return doubleValue;
@@ -409,15 +395,27 @@
{
this.floatValue = floatValue;
}
+
private String lastname = "hello";
private UUID id;
private boolean test;
- private Set set1;
- private List list1;
- private Map<String,Integer> map1;
- private Date date;
+ private Set<Integer> set1;
+ private List<Integer> list1;
+ private Map<String, Integer> map1;
private Double doubleValue;
- private Float floatValue;
+ private Float floatValue;
+ private Date last_visited;
+ private int age = 2;
+
+ public Date getLast_visited()
+ {
+ return last_visited;
+ }
+
+ public void setLast_visited(Date last_visited)
+ {
+ this.last_visited = last_visited;
+ }
public UUID getId()
{
@@ -439,10 +437,9 @@
this.lastname = lastname;
}
-
-
}
-}
+ private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
+}