blob: a6d0d21b223a799629fe789e291d6d1417b9680d [file] [log] [blame]
/*
* Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.cassandra;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
import java.util.*;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.AfterClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator}
*/
public class CassandraOperatorTest
{
public static final String NODE = "localhost";
public static final String KEYSPACE = "demo";
private static final String TABLE_NAME = "test";
private static final String TABLE_NAME_INPUT = "testinput";
private static final String APP_ID = "CassandraOperatorTest";
private static final int OPERATOR_ID = 0;
private static Cluster cluster = null;
private static Session session = null;
@SuppressWarnings("unused")
private static class TestEvent
{
int id;
TestEvent(int id)
{
this.id = id;
}
}
@BeforeClass
public static void setup()
{
try {
cluster = Cluster.builder()
.addContactPoint(NODE).build();
session = cluster.connect(KEYSPACE);
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + CassandraTransactionalStore.DEFAULT_META_TABLE + " ( "
+ CassandraTransactionalStore.DEFAULT_APP_ID_COL + " TEXT, "
+ CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT, "
+ CassandraTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT, "
+ "PRIMARY KEY (" + CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") "
+ ");";
session.execute(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME + " (id uuid PRIMARY KEY,age int,lastname text,test boolean,floatvalue float,doubleValue double,set1 set<int>,list1 list<int>,map1 map<text,int>,last_visited timestamp);";
session.execute(createTable);
createTable = "CREATE TABLE IF NOT EXISTS " + KEYSPACE + "." + TABLE_NAME_INPUT + " (id int PRIMARY KEY,lastname text,age int);";
session.execute(createTable);
}
catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
@AfterClass
public static void cleanup()
{
if (session != null) {
session.execute("DROP TABLE " + CassandraTransactionalStore.DEFAULT_META_TABLE);
session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME);
session.execute("DROP TABLE " + KEYSPACE + "." + TABLE_NAME_INPUT);
session.close();
}
if (cluster != null) {
cluster.close();
}
}
private static class TestOutputOperator extends CassandraPOJOOutputOperator
{
private static final long serialVersionUID = 201506181038L;
public long getNumOfEventsInStore()
{
String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
ResultSet resultSetCount = session.execute(countQuery);
for (Row row: resultSetCount) {
return row.getLong(0);
}
return 0;
}
public void getEventsInStore()
{
String recordsQuery = "SELECT * from " + TABLE_NAME + ";";
ResultSet resultSetRecords = session.execute(recordsQuery);
int count = 0;
for (Row row: resultSetRecords) {
LOG.debug("Boolean value is {}", row.getBool("test"));
Assert.assertEquals(true, row.getBool("test"));
LOG.debug("lastname returned is {}", row.getString("lastname"));
Assert.assertEquals("abclast", row.getString("lastname"));
LOG.debug("Double value returned is {}", row.getDouble("doubleValue"));
Assert.assertEquals("Double value is", 2.0, row.getDouble("doubleValue"), 2);
LOG.debug("Float value returned is {}", row.getFloat("floatValue"));
LOG.debug("age returned is {}", row.getInt("age"));
LOG.debug("set returned is {} ", row.getSet("set1", Integer.class));
LOG.debug("list returned is {}", row.getList("list1", Integer.class));
LOG.debug("map returned is {}", row.getMap("map1", String.class, Integer.class));
LOG.debug("date returned is {}", row.getDate("last_visited"));
Assert.assertNotEquals(new Date(System.currentTimeMillis()), row.getDate("last_visited"));
if (count == 0) {
Assert.assertEquals(2, row.getInt("age"));
Assert.assertEquals(2.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(2);
list.add(2);
map.put("key2", 2);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list, row.getList("list1", Integer.class));
}
if (count == 1) {
Assert.assertEquals(0, row.getInt("age"));
Assert.assertEquals(0.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(0);
list.add(0);
map.put("key0", 0);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list, row.getList("list1", Integer.class));
}
if (count == 2) {
Assert.assertEquals(1, row.getInt("age"));
Assert.assertEquals(1.0, row.getFloat("floatValue"), 2);
Set<Integer> set = new HashSet<Integer>();
List<Integer> list = new ArrayList<Integer>();
Map<String, Integer> map = new HashMap<String, Integer>();
set.add(1);
list.add(1);
map.put("key1", 1);
Assert.assertEquals(set, row.getSet("set1", Integer.class));
Assert.assertEquals(map, row.getMap("map1", String.class, Integer.class));
Assert.assertEquals(list, row.getList("list1", Integer.class));
}
count++;
}
}
}
private static class TestInputOperator extends CassandraPOJOInputOperator
{
private final ArrayList<Integer> ids = new ArrayList<Integer>();
private final HashMap<Integer, String> mapNames = new HashMap<Integer, String>();
private final HashMap<Integer, Integer> mapAge = new HashMap<Integer, Integer>();
public void insertEventsInTable(int numEvents)
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String insert = "INSERT INTO " + TABLE_NAME_INPUT + " (ID,lastname,age)" + " VALUES (?,?,?);";
PreparedStatement stmt = session.prepare(insert);
BoundStatement boundStatement = new BoundStatement(stmt);
for (int i = 0; i < numEvents; i++) {
ids.add(i);
mapNames.put(i, "test" + i);
mapAge.put(i, i + 10);
session.execute(boundStatement.bind(i, "test" + i, i + 10));
}
}
catch (DriverException e) {
throw new RuntimeException(e);
}
}
public ArrayList<Integer> getIds()
{
return ids;
}
public HashMap<Integer, String> getNames()
{
return mapNames;
}
public HashMap<Integer, Integer> getAge()
{
return mapAge;
}
}
@Test
public void testCassandraOutputOperator()
{
CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
transactionalStore.setNode(NODE);
transactionalStore.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestOutputOperator outputOperator = new TestOutputOperator();
outputOperator.setTablename(TABLE_NAME);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
columns.add("doubleValue");
columns.add("floatValue");
columns.add("last_visited");
columns.add("lastname");
columns.add("list1");
columns.add("map1");
columns.add("set1");
columns.add("test");
outputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("id");
expressions.add("age");
expressions.add("doubleValue");
expressions.add("floatValue");
expressions.add("last_visited");
expressions.add("lastname");
expressions.add("list1");
expressions.add("map1");
expressions.add("set1");
expressions.add("test");
outputOperator.setExpressions(expressions);
outputOperator.setStore(transactionalStore);
outputOperator.setup(context);
List<TestPojo> events = Lists.newArrayList();
for (int i = 0; i < 3; i++) {
Set<Integer> set = new HashSet<Integer>();
set.add(i);
List<Integer> list = new ArrayList<Integer>();
list.add(i);
Map<String, Integer> map = new HashMap<String, Integer>();
map.put("key" + i, i);
events.add(new TestPojo(UUID.randomUUID(), i, "abclast", true, i, 2.0, set, list, map, new Date(System.currentTimeMillis())));
}
outputOperator.beginWindow(0);
for (TestPojo event: events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
Assert.assertEquals("rows in db", 3, outputOperator.getNumOfEventsInStore());
outputOperator.getEventsInStore();
}
/*
* This test can be run on cassandra server installed on node17.
*/
@Test
public void TestCassandraInputOperator()
{
String retrieveQuery = "SELECT * FROM " + KEYSPACE + "." + TABLE_NAME_INPUT;
CassandraStore store = new CassandraStore();
store.setNode(NODE);
store.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.setOutputClass("com.datatorrent.contrib.cassandra.TestInputPojo");
inputOperator.setTablename(TABLE_NAME_INPUT);
inputOperator.setRetrieveQuery(retrieveQuery);
ArrayList<String> columns = new ArrayList<String>();
columns.add("id");
columns.add("age");
columns.add("lastname");
inputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("id");
expressions.add("age");
expressions.add("lastname");
inputOperator.setExpressions(expressions);
inputOperator.insertEventsInTable(30);
inputOperator.setPrimaryKeyColumn("id");
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.beginWindow(0);
inputOperator.insertEventsInTable(10);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
inputOperator.beginWindow(1);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("rows from db", 20, sink.collectedTuples.size());
ArrayList<Integer> listOfIDs = inputOperator.getIds();
// Rows are not stored in the same order in cassandra table in which they are inserted.
for (int i = 0; i < 10; i++) {
TestInputPojo object = (TestInputPojo)sink.collectedTuples.get(i);
Assert.assertTrue("id set in testpojo", listOfIDs.contains(object.getId()));
Assert.assertEquals("name set in testpojo", inputOperator.getNames().get(object.getId()), object.getLastname());
Assert.assertEquals("age set in testpojo", inputOperator.getAge().get(object.getId()).intValue(), object.getAge());
}
}
public static class TestPojo
{
public TestPojo(UUID randomUUID, int i, String string, boolean b, float d, double d0, Set<Integer> set1, List<Integer> list1, Map<String, Integer> map1, Date date)
{
this.id = randomUUID;
this.age = i;
this.lastname = string;
this.test = b;
this.floatValue = d;
this.doubleValue = d0;
this.set1 = set1;
this.list1 = list1;
this.map1 = map1;
this.last_visited = date;
}
public int getAge()
{
return age;
}
public void setAge(int age)
{
this.age = age;
}
public boolean isTest()
{
return test;
}
public void setTest(boolean test)
{
this.test = test;
}
public Set<Integer> getSet1()
{
return set1;
}
public void setSet1(Set<Integer> set)
{
this.set1 = set;
}
public List<Integer> getList1()
{
return list1;
}
public void setList1(List<Integer> list)
{
this.list1 = list;
}
public Map<String, Integer> getMap1()
{
return map1;
}
public void setMap1(Map<String, Integer> map)
{
this.map1 = map;
}
public Double getDoubleValue()
{
return doubleValue;
}
public void setDoubleValue(Double doubleValue)
{
this.doubleValue = doubleValue;
}
public Float getFloatValue()
{
return floatValue;
}
public void setFloatValue(Float floatValue)
{
this.floatValue = floatValue;
}
private String lastname = "hello";
private UUID id;
private boolean test;
private Set<Integer> set1;
private List<Integer> list1;
private Map<String, Integer> map1;
private Double doubleValue;
private Float floatValue;
private Date last_visited;
private int age = 2;
public Date getLast_visited()
{
return last_visited;
}
public void setLast_visited(Date last_visited)
{
this.last_visited = last_visited;
}
public UUID getId()
{
return id;
}
public void setId(UUID id)
{
this.id = id;
}
public String getLastname()
{
return lastname;
}
public void setLastname(String lastname)
{
this.lastname = lastname;
}
}
private static final Logger LOG = LoggerFactory.getLogger(CassandraOperatorTest.class);
}