Created an implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
index 786f24f..94fd64c 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOInputOperator.java
@@ -1,6 +1,17 @@
/*
- * Copyright (c) 2012-2015 Malhar, Inc.
- * All Rights Reserved.
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package com.datatorrent.contrib.cassandra;
@@ -18,20 +29,32 @@
import java.util.*;
import javax.validation.constraints.NotNull;
+/**
+ * <p>
+ * CassandraInputOperator</p>
+ * A Generic implementation of AbstractCassandraInputOperator which gets field values from Cassandra database columns and sets in a POJO.
+ * @displayName Cassandra Input Operator
+ * @category Input
+ * @tags input operator
+ */
public class CassandraPOJOInputOperator extends AbstractCassandraInputOperator<Object>
{
@NotNull
private ArrayList<String> columns;
- private final ArrayList<DataType> columnDataTypes;
+ private final transient ArrayList<DataType> columnDataTypes;
+
@NotNull
private ArrayList<String> expressions;
@NotNull
private String tablename;
- private transient ArrayList<Object> setters;
+ private final transient ArrayList<Object> setters;
private String retrieveQuery;
/*
- * Output POJO being input by the user.
+ * POJO class which is generated as output from this operator.
+ * Example:
+ * public class TestPOJO{ int intfield; public int getInt(){} public void setInt(){} }
+ * outputClass = TestPOJO
* POJOs will be generated on fly in later implementation.
*/
private String outputClass;
@@ -46,6 +69,9 @@
this.outputClass = outputClass;
}
+ /*
+ * Query input by user: Example: select * from keyspace.tablename;
+ */
public String getRetrieveQuery()
{
return retrieveQuery;
@@ -56,6 +82,10 @@
this.retrieveQuery = retrieveQuery;
}
+ /*
+ * An ArrayList of Java expressions that will yield the cassandra column value to be set in output object.
+ * Each expression corresponds to one column in the Cassandra table.
+ */
public ArrayList<String> getExpressions()
{
return expressions;
@@ -107,13 +137,8 @@
Class<?> className = null;
Object obj = null;
- com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
- final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
-
- final int numberOfColumns = rsMetaData.size();
- if (setters.isEmpty()) {
- System.out.println("create setters");
try {
+ // This code will be replaced after integration of creating POJOs on the fly utility.
className = Class.forName(outputClass);
obj = className.newInstance();
}
@@ -126,11 +151,16 @@
catch (IllegalAccessException ex) {
throw new RuntimeException(ex);
}
+ if (setters.isEmpty()) {
+ com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace + "." + tablename);
+ final ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+ final int numberOfColumns = rsMetaData.size();
+
for (int i = 0; i < numberOfColumns; i++) {
- // get the designated column's data type.
+ // Get the designated column's data type.
final DataType type = rsMetaData.getType(i);
columnDataTypes.add(type);
- Object setter = null;
+ Object setter;
final String setterExpr = expressions.get(i);
switch (type.getName()) {
case ASCII:
@@ -180,19 +210,20 @@
}
}
- for (int i = 0; i < numberOfColumns; i++) {
+ final int size = columnDataTypes.size();
+ for (int i = 0; i < size; i++) {
final DataType type = columnDataTypes.get(i);
switch (type.getName()) {
case UUID:
final UUID id = row.getUUID(i);
- System.out.println("id is "+id);
+ System.out.println("id is " + id);
((Setter<Object, UUID>)setters.get(i)).set(obj, id);
break;
case ASCII:
case VARCHAR:
case TEXT:
final String ascii = row.getString(i);
- System.out.println("ascii is "+ascii);
+ System.out.println("ascii is " + ascii);
((Setter<Object, String>)setters.get(i)).set(obj, ascii);
break;
case BOOLEAN:
@@ -201,7 +232,7 @@
break;
case INT:
final int intValue = row.getInt(i);
- System.out.println("age is "+intValue);
+ System.out.println("age is " + intValue);
((SetterInt)setters.get(i)).set(obj, intValue);
break;
case BIGINT:
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
index 0e4dcff..05441a1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraPOJOOutputOperator.java
@@ -46,10 +46,10 @@
{
@NotNull
private ArrayList<String> columns;
- private final ArrayList<DataType> columnDataTypes;
+ private final transient ArrayList<DataType> columnDataTypes;
@NotNull
private ArrayList<String> expressions;
- private transient ArrayList<Object> getters;
+ private final transient ArrayList<Object> getters;
/*
* An ArrayList of Java expressions that will yield the field value from the POJO.
@@ -170,13 +170,13 @@
{
StringBuilder queryfields = new StringBuilder("");
StringBuilder values = new StringBuilder("");
- for (int i = 0; i < columns.size(); i++) {
+ for (String column: columns) {
if (queryfields.length() == 0) {
- queryfields.append(columns.get(i));
+ queryfields.append(column);
values.append("?");
}
else {
- queryfields.append(",").append(columns.get(i));
+ queryfields.append(",").append(column);
values.append(",").append("?");
}
}
@@ -259,5 +259,5 @@
return boundStmnt;
}
- private static transient final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraPOJOOutputOperator.class);
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index a412cf7..68e28cc 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -41,11 +41,13 @@
public static final String KEYSPACE = "demo";
private static final String TABLE_NAME = "test";
+ private static final String TABLE_NAME_INPUT = "testinput";
private static String APP_ID = "CassandraOperatorTest";
private static int OPERATOR_ID = 0;
private static Cluster cluster = null;
private static Session session = null;
+ @SuppressWarnings("unused")
private static class TestEvent
{
int id;
@@ -74,6 +76,8 @@
session.execute(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
session.execute(createTable);
+ createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME_INPUT + " (id uuid PRIMARY KEY,lastname text,age int);";
+ session.execute(createTable);
}
catch (Throwable e) {
DTThrowable.rethrow(e);
@@ -93,79 +97,84 @@
}
}
+
private static class TestOutputOperator extends CassandraPOJOOutputOperator
{
public long getNumOfEventsInStore()
{
- String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
- ResultSet resultSetCount = session.execute(countQuery);
- for (Row row: resultSetCount) {
- return row.getLong(0);
- }
- return 0;
+ String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
+ ResultSet resultSetCount = session.execute(countQuery);
+ for (Row row: resultSetCount) {
+ return row.getLong(0);
+ }
+ return 0;
}
public void getEventsInStore()
{
- String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
- ResultSet resultSetRecords = session.execute(recordsQuery);
- int count = 0;
- for (Row row: resultSetRecords) {
- LOG.debug("Boolean value is {}", row.getBool("test"));
- Assert.assertEquals(true, row.getBool("test"));
- LOG.debug("lastname returned is {}", row.getString("lastname"));
- Assert.assertEquals("abclast", row.getString("lastname"));
- LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
- Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
- LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
- LOG.debug("age returned is {}", row.getInt("age"));
- LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
- LOG.debug("list returned is {}", row.getList("list1", Integer.class));
- LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
- LOG.debug("date returned is {}", row.getDate("last_visited"));
- Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
- if (count == 0) {
- Assert.assertEquals(2, row.getInt("age"));
- Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
- Set<Integer> set = new HashSet<Integer>();
- List<Integer> list = new ArrayList<Integer>();
- Map<String, Integer> map = new HashMap<String, Integer>();
- set.add(2);
- list.add(2);
- map.put("key2", 2);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
+ String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
+ ResultSet resultSetRecords = session.execute(recordsQuery);
+ int count =0;
+ for (Row row: resultSetRecords) {
+ LOG.debug("Boolean value is {}", row.getBool("test"));
+ Assert.assertEquals(true, row.getBool("test"));
+ LOG.debug("lastname returned is {}", row.getString("lastname"));
+ Assert.assertEquals("abclast", row.getString("lastname"));
+ LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
+ Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
+ LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
+ LOG.debug("age returned is {}", row.getInt("age"));
+ LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
+ LOG.debug("list returned is {}", row.getList("list1", Integer.class));
+ LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
+ LOG.debug("date returned is {}", row.getDate("last_visited"));
+ Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
+ if(count == 0)
+ {
+ Assert.assertEquals(2, row.getInt("age"));
+ Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(2);
+ list.add(2);
+ map.put("key2", 2);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ if(count == 1)
+ {
+ Assert.assertEquals(0, row.getInt("age"));
+ Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(0);
+ list.add(0);
+ map.put("key0", 0);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ if(count == 2)
+ {
+ Assert.assertEquals(1, row.getInt("age"));
+ Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
+ Set<Integer> set = new HashSet<Integer>();
+ List<Integer> list = new ArrayList<Integer>();
+ Map<String,Integer> map = new HashMap<String, Integer>();
+ set.add(1);
+ list.add(1);
+ map.put("key1", 1);
+ Assert.assertEquals(set, row.getSet("set1", Integer.class));
+ Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
+ Assert.assertEquals(list,row.getList("list1", Integer.class));
+ }
+ count++;
}
- if (count == 1) {
- Assert.assertEquals(0, row.getInt("age"));
- Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
- Set<Integer> set = new HashSet<Integer>();
- List<Integer> list = new ArrayList<Integer>();
- Map<String, Integer> map = new HashMap<String, Integer>();
- set.add(0);
- list.add(0);
- map.put("key0", 0);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
- }
- if (count == 2) {
- Assert.assertEquals(1, row.getInt("age"));
- Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
- Set<Integer> set = new HashSet<Integer>();
- List<Integer> list = new ArrayList<Integer>();
- Map<String, Integer> map = new HashMap<String, Integer>();
- set.add(1);
- list.add(1);
- map.put("key1", 1);
- Assert.assertEquals(set, row.getSet("set1", Integer.class));
- Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
- Assert.assertEquals(list, row.getList("list1", Integer.class));
- }
- count++;
- }
+
}
@@ -197,45 +206,6 @@
}
- public static class TestInputPojo
- {
- private UUID id;
- private String lastname;
- private int age;
-
- public int getAge()
- {
- return age;
- }
-
- public void setAge(int age)
- {
- this.age = age;
- }
-
- public UUID getId()
- {
- return id;
- }
-
- public void setId(UUID id)
- {
- this.id = id;
- }
-
- public String getLastname()
- {
- return lastname;
- }
-
- public void setLastname(String lastname)
- {
- this.lastname = lastname;
- }
-
-
- }
-
@Test
public void testCassandraOutputOperator()
{
@@ -299,11 +269,10 @@
outputOperator.getEventsInStore();
}
- //This Test needs to be improved.
@Test
public void TestCassandraInputOperator()
{
- String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME + ";";
+ String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT + ";";
CassandraStore store = new CassandraStore();
store.setNode(NODE);
store.setKeyspace(KEYSPACE);
@@ -314,9 +283,8 @@
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
- inputOperator.insertEventsInTable(10);
- inputOperator.setOutputClass("TestInputPojo");
- inputOperator.setTablename(TABLE_NAME);
+ inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
+ inputOperator.setTablename(TABLE_NAME_INPUT);
inputOperator.setRetrieveQuery(retrieveQuery);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
@@ -331,7 +299,8 @@
inputOperator.setExpressions(expressions);
inputOperator.setStore(store);
- inputOperator.insertEventsInTable(10);
+ //Inserting events in cassandra table through shell to check the unique id generated.
+ // inputOperator.insertEventsInTable(2);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
@@ -340,8 +309,23 @@
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
+ Assert.assertEquals("rows from db", 2, sink.collectedTuples.size());
+ int count =0;
+ for (Object o : sink.collectedTuples) {
+ count++;
+ TestInputPojo object = (TestInputPojo)o;
+ if(count == 1){
+ Assert.assertEquals("id set in testpojo", "1878a0a0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
+ Assert.assertEquals("name set in testpojo", "test2", object.getLastname());
+ Assert.assertEquals("age set in testpojo", 13, object.getAge());
+ }
+ if(count == 2){
+ Assert.assertEquals("id set in testpojo", "2196c3b0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
+ Assert.assertEquals("name set in testpojo", "test4", object.getLastname());
+ Assert.assertEquals("age set in testpojo", 15, object.getAge());
+ }
+ }
- Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
}
public static class TestPojo
@@ -472,7 +456,7 @@
}
}
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
- private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
new file mode 100644
index 0000000..37a67df
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/TestInputPojo.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import java.util.UUID;
+
+public class TestInputPojo
+{
+ private UUID id;
+ private String lastname;
+ private int age;
+
+ public UUID getId()
+ {
+ return id;
+ }
+
+ public void setId(UUID id)
+ {
+ this.id = id;
+ }
+
+ public int getAge()
+ {
+ return age;
+ }
+
+ public void setAge(int age)
+ {
+ this.age = age;
+ }
+
+ public String getLastname()
+ {
+ return lastname;
+ }
+
+ public void setLastname(String lastname)
+ {
+ this.lastname = lastname;
+ }
+
+}