blob: 68e28cc521ebf8f17b076c88d8907edb27aff5ec [file] [log] [blame]
/*
* Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.cassandra;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
import java.util.*;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator}
*/
public class CassandraOperatorTest
{
public static final String NODE = "localhost";
public static final String KEYSPACE = "demo";
private static final String TABLE_NAME = "test";
private static final String TABLE_NAME_INPUT = "testinput";
private static String APP_ID = "CassandraOperatorTest";
private static int OPERATOR_ID = 0;
private static Cluster cluster = null;
private static Session session = null;
@SuppressWarnings("unused")
private static class TestEvent
{
int id;
TestEvent(int id)
{
this.id = id;
}
}
@BeforeClass
public static void setup()
{
try {
cluster = Cluster.builder()
.addContactPoint(NODE).build();
session = cluster.connect(KEYSPACE);
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + CassandraTransactionalStore.DEFAULT_META_TABLE + " ( "
+ CassandraTransactionalStore.DEFAULT_APP_ID_COL + " TEXT, "
+ CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT, "
+ CassandraTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT, "
+ "PRIMARY KEY (" + CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") "
+ ");";
session.execute(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
session.execute(createTable);
createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME_INPUT + " (id uuid PRIMARY KEY,lastname text,age int);";
session.execute(createTable);
}
catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
@AfterClass
public static void cleanup()
{
if (session != null) {
session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
session.close();
}
if (cluster != null) {
cluster.close();
}
}
private static class TestOutputOperator extends CassandraPOJOOutputOperator
{
public long getNumOfEventsInStore()
{
String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
ResultSet resultSetCount = session.execute(countQuery);
for (Row row: resultSetCount) {
return row.getLong(0);
}
return 0;
}
public void getEventsInStore()
{
String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
ResultSet resultSetRecords = session.execute(recordsQuery);
int count =0;
for (Row row: resultSetRecords) {
LOG.debug("Boolean value is {}", row.getBool("test"));
Assert.assertEquals(true, row.getBool("test"));
LOG.debug("lastname returned is {}", row.getString("lastname"));
Assert.assertEquals("abclast", row.getString("lastname"));
LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
Assert.assertEquals("Double value is",2.0,row.getDouble("doubleValue"),2);
LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
LOG.debug("age returned is {}", row.getInt("age"));
LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
LOG.debug("list returned is {}", row.getList("list1", Integer.class));
LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
LOG.debug("date returned is {}", row.getDate("last_visited"));
Assert.assertNotEquals(new Date(System.currentTimeMillis()),row.getDate("last_visited"));
if(count == 0)
{
Assert.assertEquals(2, row.getInt("age"));
Assert.assertEquals(2.0, row.getFloat("floatValue"),2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String,Integer> map = new HashMap<String, Integer>();
set.add(2);
list.add(2);
map.put("key2", 2);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list,row.getList("list1", Integer.class));
}
if(count == 1)
{
Assert.assertEquals(0, row.getInt("age"));
Assert.assertEquals(0.0, row.getFloat("floatValue"),2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String,Integer> map = new HashMap<String, Integer>();
set.add(0);
list.add(0);
map.put("key0", 0);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list,row.getList("list1", Integer.class));
}
if(count == 2)
{
Assert.assertEquals(1, row.getInt("age"));
Assert.assertEquals(1.0, row.getFloat("floatValue"),2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String,Integer> map = new HashMap<String, Integer>();
set.add(1);
list.add(1);
map.put("key1", 1);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list,row.getList("list1", Integer.class));
}
count++;
}
}
}
private static class TestInputOperator extends CassandraPOJOInputOperator
{
public void insertEventsInTable(int numEvents)
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String insert = "INSERT INTO " + TABLE_NAME + " (ID)" + " VALUES (?);";
PreparedStatement stmt = session.prepare(insert);
BoundStatement boundStatement = new BoundStatement(stmt);
Statement statement;
for (int i = 0; i < numEvents; i++) {
statement = boundStatement.bind(i);
session.execute(statement);
}
}
catch (DriverException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testCassandraOutputOperator()
{
CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
transactionalStore.setNode(NODE);
transactionalStore.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestOutputOperator outputOperator = new TestOutputOperator();
outputOperator.setTablename(TABLE_NAME);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
columns.add("doubleValue");
columns.add("floatValue");
columns.add("last_visited");
columns.add("lastname");
columns.add("list1");
columns.add("map1");
columns.add("set1");
columns.add("test");
outputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("id");
expressions.add("age");
expressions.add("doubleValue");
expressions.add("floatValue");
expressions.add("last_visited");
expressions.add("lastname");
expressions.add("list1");
expressions.add("map1");
expressions.add("set1");
expressions.add("test");
outputOperator.setExpressions(expressions);
outputOperator.setStore(transactionalStore);
outputOperator.setup(context);
List<TestPojo> events = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Set<Integer> set = new HashSet<Integer>();
set.add(i);
List<Integer> list = new ArrayList<Integer>();
list.add(i);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("key" + i, i);
events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis())));
}
outputOperator.beginWindow(0);
for (TestPojo event: events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
outputOperator.getEventsInStore();
}
@Test
public void TestCassandraInputOperator()
{
String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT + ";";
CassandraStore store = new CassandraStore();
store.setNode(NODE);
store.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
inputOperator.setTablename(TABLE_NAME_INPUT);
inputOperator.setRetrieveQuery(retrieveQuery);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
columns.add("lastname");
inputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("id");
expressions.add("age");
expressions.add("lastname");
inputOperator.setExpressions(expressions);
inputOperator.setStore(store);
//Inserting events in cassandra table through shell to check the unique id generated.
// inputOperator.insertEventsInTable(2);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("rows from db", 2, sink.collectedTuples.size());
int count =0;
for (Object o : sink.collectedTuples) {
count++;
TestInputPojo object = (TestInputPojo)o;
if(count == 1){
Assert.assertEquals("id set in testpojo", "1878a0a0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
Assert.assertEquals("name set in testpojo", "test2", object.getLastname());
Assert.assertEquals("age set in testpojo", 13, object.getAge());
}
if(count == 2){
Assert.assertEquals("id set in testpojo", "2196c3b0-139d-11e5-87bf-dd30f8d32bb8", object.getId().toString());
Assert.assertEquals("name set in testpojo", "test4", object.getLastname());
Assert.assertEquals("age set in testpojo", 15, object.getAge());
}
}
}
public static class TestPojo
{
public TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
{
this.id = randomUUID;
this.age = i;
this.lastname = string;
this.test = b;
this.floatValue = d;
this.doubleValue = d0;
this.set1 = set1;
this.list1 = list1;
this.map1 = map1;
this.last_visited = date;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
public boolean isTest()
{
return test;
}
public void setTest(boolean test)
{
this.test = test;
}
public Set<Integer> getSet1()
{
return set1;
}
public void setSet1(Set<Integer> set)
{
this.set1 = set;
}
public List<Integer> getList1()
{
return list1;
}
public void setList1(List<Integer> list)
{
this.list1 = list;
}
public Map<String, Integer> getMap1()
{
return map1;
}
public void setMap1(Map<String, Integer> map)
{
this.map1 = map;
}
public Double getDoubleValue()
{
return doubleValue;
}
public void setDoubleValue(Double doubleValue)
{
this.doubleValue = doubleValue;
}
public Float getFloatValue()
{
return floatValue;
}
public void setFloatValue(Float floatValue)
{
this.floatValue = floatValue;
}
private String lastname = "hello";
private UUID id;
private boolean test;
private Set<Integer> set1;
private List<Integer> list1;
private Map<String, Integer> map1;
private Double doubleValue;
private Float floatValue;
private Date last_visited;
private int age = 2;
public Date getLast_visited()
{
return last_visited;
}
public void setLast_visited(Date last_visited)
{
this.last_visited = last_visited;
}
public UUID getId()
{
return id;
}
public void setId(UUID id)
{
this.id = id;
}
public String getLastname()
{
return lastname;
}
public void setLastname(String lastname)
{
this.lastname = lastname;
}
}
private static final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
}