blob: e75a0abd4dd40838d7a04c28d772cee2f223d88d [file] [log] [blame]
/*
* Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datatorrent.contrib.cassandra;
import com.datastax.driver.core.*;
import com.datastax.driver.core.exceptions.DriverException;
import com.datatorrent.api.Attribute.AttributeMap;
import com.datatorrent.api.DAG;
import com.datatorrent.common.util.DTThrowable;
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.List;
/**
* Tests for {@link AbstractCassandraTransactionableOutputOperator} and {@link AbstractCassandraInputOperator}
*/
public class CassandraOperatorTest
{
public static final String NODE = "localhost";
public static final String KEYSPACE = "demo";
private static final String TABLE_NAME = "test_event_table";
private static String APP_ID = "CassandraOperatorTest";
private static int OPERATOR_ID = 0;
private static class TestEvent
{
int id;
TestEvent(int id)
{
this.id = id;
}
}
@BeforeClass
public static void setup()
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String createMetaTable = "CREATE TABLE IF NOT EXISTS " + CassandraTransactionalStore.DEFAULT_META_TABLE + " ( " +
CassandraTransactionalStore.DEFAULT_APP_ID_COL + " TEXT, " +
CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT, " +
CassandraTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT, " +
"PRIMARY KEY (" + CassandraTransactionalStore.DEFAULT_APP_ID_COL + ", " + CassandraTransactionalStore.DEFAULT_OPERATOR_ID_COL + ") " +
");";
session.execute(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME + " (ID INT PRIMARY KEY);";
session.execute(createTable);
}
catch (Throwable e) {
DTThrowable.rethrow(e);
}
}
private static void cleanTable()
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String cleanTable = "TRUNCATE " + TABLE_NAME + ";";
session.execute(cleanTable);
}
catch (DriverException e) {
throw new RuntimeException(e);
}
}
private static class TestOutputOperator extends CassandraOutputOperator
{
public long getNumOfEventsInStore()
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String countQuery = "SELECT count(*) from " + TABLE_NAME + ";";
ResultSet resultSet = session.execute(countQuery);
for(Row row: resultSet)
{
return row.getLong(0);
}
return 0;
}
catch (DriverException e) {
throw new RuntimeException("fetching count", e);
}
}
}
private static class TestInputOperator extends AbstractCassandraInputOperator<TestEvent>
{
private static final String retrieveQuery = "SELECT * FROM " +KEYSPACE +"."+TABLE_NAME + ";";
TestInputOperator()
{
cleanTable();
}
@Override
public TestEvent getTuple(Row row)
{
try {
return new TestEvent(row.getInt(0));
}
catch (DriverException e) {
throw new RuntimeException(e);
}
}
@Override
public String queryToRetrieveData()
{
return retrieveQuery;
}
public void insertEventsInTable(int numEvents)
{
try {
Cluster cluster = Cluster.builder()
.addContactPoint(NODE).build();
Session session = cluster.connect(KEYSPACE);
String insert = "INSERT INTO " + TABLE_NAME +" (ID)"+ " VALUES (?);";
PreparedStatement stmt = session.prepare(insert);
BoundStatement boundStatement = new BoundStatement(stmt);
Statement statement;
for (int i = 0; i < numEvents; i++) {
statement = boundStatement.bind(i);
session.execute(statement);
}
}
catch (DriverException e) {
throw new RuntimeException(e);
}
}
}
@Test
public void testCassandraOutputOperator()
{
CassandraTransactionalStore transactionalStore = new CassandraTransactionalStore();
transactionalStore.setNode(NODE);
transactionalStore.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestOutputOperator outputOperator = new TestOutputOperator();
outputOperator.setTablename(TABLE_NAME);
ArrayList<String> columns = new ArrayList<String>();
columns.add("ID");
outputOperator.setColumns(columns);
ArrayList<String> expressions = new ArrayList<String>();
expressions.add("getID()");
outputOperator.setExpressions(expressions);
outputOperator.setStore(transactionalStore);
outputOperator.setup(context);
List<TestEvent> events = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
events.add(new TestEvent(i));
}
outputOperator.beginWindow(0);
for (TestEvent event : events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
}
@Test
public void TestCassandraInputOperator()
{
CassandraStore store = new CassandraStore();
store.setNode(NODE);
store.setKeyspace(KEYSPACE);
AttributeMap.DefaultAttributeMap attributeMap = new AttributeMap.DefaultAttributeMap();
attributeMap.put(DAG.APPLICATION_ID, APP_ID);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(OPERATOR_ID, attributeMap);
TestInputOperator inputOperator = new TestInputOperator();
inputOperator.setStore(store);
inputOperator.insertEventsInTable(10);
CollectorTestSink<Object> sink = new CollectorTestSink<Object>();
inputOperator.outputPort.setSink(sink);
inputOperator.setup(context);
inputOperator.beginWindow(0);
inputOperator.emitTuples();
inputOperator.endWindow();
Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
}
public InnerObj innerObj = new InnerObj();
/**
* @return the innerObj
*/
public InnerObj getInnerObj()
{
return innerObj;
}
/**
* @param innerObj the innerObj to set
*/
public void setInnerObj(InnerObj innerObj)
{
this.innerObj = innerObj;
}
public class InnerObj
{
public InnerObj()
{
}
private int ID=11;
/**
* @return the int ID
*/
public int getID()
{
return ID;
}
/**
* @param ID the intVal to set
*/
public void setID(int ID)
{
this.ID = ID;
}
}
}