POJO implementaion for Cassandra,added a test case and minor changes in memsql.
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
index 5fd3470..0167694 100644
--- a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputBenchmark.java
@@ -18,7 +18,6 @@
import com.datatorrent.api.*;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.memsql.MemsqlInputOperator;
import com.datatorrent.lib.stream.DevNull;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java
new file mode 100644
index 0000000..27878f9
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/MemsqlInputOperator.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright (c) 2014 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.datatorrent.benchmark;
+
+import com.datatorrent.contrib.memsql.*;
+import com.datatorrent.api.Context.OperatorContext;
+import java.sql.Statement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import javax.validation.constraints.NotNull;
+
+public class MemsqlInputOperator extends AbstractMemsqlInputOperator<Object>
+{
+ private int blastSize = 1000;
+ private int currentRow = 1;
+ private int inputSize = 0;
+ @NotNull
+ private String tablename;
+ @NotNull
+ private String primaryKeyColumn;
+
+ /*
+ * Primary Key Column of table.
+ * Gets the primary key column of memsql table.
+ */
+ public String getPrimaryKeyColumn()
+ {
+ return primaryKeyColumn;
+ }
+
+ /*
+ * Primary Key Column of table.
+ * Sets the primary key column of memsql table.
+ */
+ public void setPrimaryKeyCol(String primaryKeyColumn)
+ {
+ this.primaryKeyColumn = primaryKeyColumn;
+ }
+
+ /*
+ * Name of the table in Memsql Database.
+ * Gets the Memsql Tablename.
+ * @return tablename
+ */
+ public String getTablename()
+ {
+ return tablename;
+ }
+
+ public void setTablename(String tablename)
+ {
+ this.tablename = tablename;
+ }
+
+ public MemsqlInputOperator()
+ {
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+
+ try {
+ Statement statement = store.getConnection().createStatement();
+ ResultSet resultSet = statement.executeQuery("select count(*) from " + tablename);
+ resultSet.next();
+ inputSize = resultSet.getInt(1);
+ statement.close();
+ }
+ catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public Integer getTuple(ResultSet result)
+ {
+ Integer tuple = null;
+
+ try {
+ tuple = result.getInt(2);
+ }
+ catch (SQLException ex) {
+ throw new RuntimeException(ex);
+ }
+
+ return tuple;
+ }
+
+ @Override
+ public void emitTuples()
+ {
+ if (currentRow >= inputSize) {
+ return;
+ }
+
+ super.emitTuples();
+ }
+
+ @Override
+ public String queryToRetrieveData()
+ {
+ if (currentRow > inputSize) {
+ return null;
+ }
+
+ int endRow = currentRow + blastSize;
+
+ if (endRow > inputSize + 1) {
+ endRow = inputSize + 1;
+ }
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("select * from ");
+ sb.append(tablename);
+ sb.append(" where ");
+ sb.append(primaryKeyColumn);
+ sb.append(" >= ");
+ sb.append(currentRow);
+ sb.append(" and ");
+ sb.append(primaryKeyColumn);
+ sb.append(" < ");
+ sb.append(endRow);
+
+ currentRow = endRow;
+
+ return sb.toString();
+ }
+
+ public void setBlastSize(int blastSize)
+ {
+ this.blastSize = blastSize;
+ }
+
+ /*
+ * Records are read in batches of this size.
+ * Gets the batch size.
+ * @return batchsize
+ */
+ public int getBlastSize()
+ {
+ return blastSize;
+ }
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
index 5b4864f..a51f5a8 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperatorPS.java
@@ -47,7 +47,6 @@
* @param <T>type of tuple</T>
* @since 1.0.2
*/
-@Deprecated
public abstract class AbstractCassandraTransactionableOutputOperatorPS<T> extends AbstractCassandraTransactionableOutputOperator<T>{
private transient PreparedStatement updateCommand;
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
new file mode 100644
index 0000000..b9a3f69
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -0,0 +1,249 @@
+/*
+ * Copyright (c) 2015 DataTorrent, Inc. ALL Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.datatorrent.contrib.cassandra;
+
+import com.datastax.driver.core.*;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.Statement;
+import com.datastax.driver.core.exceptions.DriverException;
+import com.datatorrent.lib.util.PojoUtils;
+import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
+import com.datatorrent.lib.util.PojoUtils.GetterDouble;
+import com.datatorrent.lib.util.PojoUtils.GetterFloat;
+import com.datatorrent.lib.util.PojoUtils.GetterInt;
+import com.datatorrent.lib.util.PojoUtils.GetterLong;
+import com.datatorrent.lib.util.PojoUtils.GetterObject;
+import com.datatorrent.lib.util.PojoUtils.GetterString;
+import java.sql.*;
+import java.util.ArrayList;
+import javax.validation.constraints.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * <p>
+ * CassandraOutputOperator class.</p>
+ * A Generic implementation of AbstractCassandraTransactionableOutputOperatorPS which takes in any POJO.
+ *
+ * @since 1.0.3
+ */
+public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
+{
+ private ArrayList<String> columns;
+ private ArrayList<DataType> columnDataTypes;
+ private ArrayList<String> expressions;
+ private transient ArrayList<Object> getters;
+
+ /*
+ * An ArrayList of Java expressions that will yield the field value from the POJO.
+ * Each expression corresponds to one column in the Cassandra table.
+ */
+ public ArrayList<String> getExpressions()
+ {
+ return expressions;
+ }
+
+ public void setExpressions(ArrayList<String> expressions)
+ {
+ this.expressions = expressions;
+ }
+
+ public ArrayList<String> getColumns()
+ {
+ return columns;
+ }
+
+ public void setColumns(ArrayList<String> columns)
+ {
+ this.columns = columns;
+ }
+
+ @NotNull
+ private String tablename;
+
+
+ /*
+ * Tablename in cassandra.
+ */
+ public String getTablename()
+ {
+ return tablename;
+ }
+
+ public void setTablename(String tablename)
+ {
+ this.tablename = tablename;
+ }
+
+ public CassandraOutputOperator()
+ {
+ super();
+ columnDataTypes = new ArrayList<DataType>();
+ getters = new ArrayList<Object>();
+ }
+
+
+ @Override
+ public void processTuple(Object tuple)
+ {
+ if (getters.isEmpty()) {
+ processFirstTuple(tuple);
+ }
+ super.processTuple(tuple);
+ }
+
+ public void processFirstTuple(Object tuple)
+ {
+ Class<?> fqcn = tuple.getClass();
+ int size = columnDataTypes.size();
+ for (int i = 0; i < size; i++) {
+ DataType type = columnDataTypes.get(i);
+ String getterExpression = expressions.get(i);
+ if (type.equals(DataType.Name.ASCII) || type.equals(DataType.Name.TEXT) || type.equals(DataType.Name.VARCHAR)) {
+ GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
+ getters.add(getVarchar);
+ }
+ else if (type.equals(DataType.Name.BOOLEAN)) {
+ GetterBoolean getBoolean = PojoUtils.createGetterBoolean(fqcn, getterExpression);
+ getters.add(getBoolean);
+ }
+ else if (type.equals(DataType.Name.INT)) {
+ GetterInt getInt = PojoUtils.createGetterInt(fqcn, getterExpression);
+ getters.add(getInt);
+ }
+ else if (type.equals(DataType.Name.BIGINT) || type.equals(DataType.Name.COUNTER)) {
+ GetterLong getLong = PojoUtils.createExpressionGetterLong(fqcn, getterExpression);
+ getters.add(getLong);
+ }
+ else if (type.equals(DataType.Name.FLOAT)) {
+ GetterFloat getFloat = PojoUtils.createGetterFloat(fqcn, getterExpression);
+ getters.add(getFloat);
+ }
+ else if (type.equals(DataType.Name.DOUBLE)) {
+ GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
+ getters.add(getDouble);
+ }
+ else if (type.equals(DataType.Name.TIMESTAMP)) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add(getObject);
+ }
+ else if (type.equals(DataType.Name.CUSTOM) || type.equals(DataType.Name.LIST) || type.equals(DataType.Name.MAP) || type.equals(DataType.Name.SET) ) {
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add(getObject);
+ }
+
+ }
+
+ }
+
+ @Override
+ protected PreparedStatement getUpdateCommand()
+ {
+ com.datastax.driver.core.ResultSet rs = store.getSession().execute("select * from " + store.keyspace +"."+tablename);
+
+ ColumnDefinitions rsMetaData = rs.getColumnDefinitions();
+
+ int numberOfColumns = 0;
+
+ numberOfColumns = rsMetaData.size();
+
+ LOG.debug("resultSet MetaData column Count=" + numberOfColumns);
+
+ for (int i = 0; i < numberOfColumns; i++) {
+ // get the designated column's data type.
+ DataType type = rsMetaData.getType(i);
+ columnDataTypes.add(type);
+ LOG.debug("column type is " + rsMetaData.getType(i));
+ LOG.debug("sql column type is " + type);
+ }
+
+ StringBuilder queryfields = new StringBuilder("");
+ StringBuilder values = new StringBuilder("");
+ for (int i = 0; i < columns.size(); i++) {
+ if (queryfields.length()==0) {
+ queryfields.append(columns.get(i));
+ values.append("?");
+ }
+ else {
+ queryfields.append(",").append(columns.get(i));
+ values.append(",").append(columns.get(i));
+ }
+ }
+ LOG.debug("queryfields are", queryfields.toString());
+ LOG.debug("values are ",values.toString());
+ String statement
+ = "INSERT INTO " + store.keyspace + "."
+ + tablename
+ + "(" + queryfields.toString() + ")"
+ + "VALUES (" + values.toString() + ");";
+ LOG.debug("statement is", statement);
+ return store.getSession().prepare(statement);
+ }
+
+ @Override
+ protected Statement setStatementParameters(PreparedStatement updateCommand, Object tuple) throws DriverException
+ {
+ BoundStatement boundStmnt = new BoundStatement(updateCommand);
+ int size = columnDataTypes.size();
+ Object getter;
+ for (int i = 0; i < size; i++) {
+ DataType type = columnDataTypes.get(i);
+ switch (type.getName()) {
+ case ASCII:
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case VARCHAR:
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case TEXT:
+ getter = ((GetterString)getters.get(i)).get(tuple);
+ break;
+ case BOOLEAN:
+ getter = ((GetterBoolean)getters.get(i)).get(tuple);
+ break;
+ case INT:
+ getter = ((GetterInt)getters.get(i)).get(tuple);
+ break;
+ case BIGINT:
+ getter = ((GetterLong)getters.get(i)).get(tuple);
+ break;
+ case COUNTER:
+ getter = ((GetterLong)getters.get(i)).get(tuple);
+ break;
+ case FLOAT:
+ getter = ((GetterFloat)getters.get(i)).get(tuple);
+ break;
+ case DOUBLE:
+ getter = ((GetterDouble)getters.get(i)).get(tuple);
+ break;
+ case TIMESTAMP:
+ getter = (Date)((GetterObject)getters.get(i)).get(tuple);
+ break;
+ case CUSTOM:
+ getter = ((GetterObject)getters.get(i)).get(tuple);
+ break;
+ default:
+ getter = ((GetterObject)getters.get(i)).get(tuple);
+ break;
+ }
+ boundStmnt.bind(i + 1, getter);
+
+ }
+ return boundStmnt;
+ }
+
+ private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
index ee47e35..d121181 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraStore.java
@@ -29,7 +29,7 @@
import com.datatorrent.lib.db.Connectable;
/**
- * A {@link Connectable} that uses cassandra to connect to stores and implements Connectable interface.
+ * A {@link Connectable} that uses cassandra to connect to stores and implements Connectable interface.
* <p>
* @displayName Cassandra Store
* @category Store
@@ -45,7 +45,19 @@
private String node;
protected transient Cluster cluster = null;
protected transient Session session = null;
- protected String keyspace=null;
+
+ @NotNull
+ protected String keyspace;
+
+ /*
+ * The Cassandra keyspace is a namespace that defines how data is replicated on nodes.
+ * Typically, a cluster has one keyspace per application. Replication is controlled on a per-keyspace basis, so data that has different replication requirements typically resides in different keyspaces.
+ * Keyspaces are not designed to be used as a significant map layer within the data model. Keyspaces are designed to control data replication for a set of tables.
+ */
+ public String getKeyspace()
+ {
+ return keyspace;
+ }
/**
* Sets the keyspace.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
index c8795af..46f3bbf 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/memsql/MemsqlOutputOperator.java
@@ -49,12 +49,10 @@
private ArrayList<Integer> columnDataTypes;
private transient boolean isFirstTuple;
private transient ArrayList<Object> getters;
- private String primaryKey;
/*
* An ArrayList of Java expressions that will yield the field value from the POJO.
* Each expression corresponds to one column in the memsql table.
- * Example:
*/
public ArrayList<String> getExpression()
{
@@ -108,22 +106,6 @@
this.tablename = tablename;
}
- /*
- * Gets the private key of the memsql table.
- */
- public String getPrimaryKey()
- {
- return primaryKey;
- }
-
- /*
- * Sets the private key in the memsql table.
- */
- public void setPrivateKey(String primaryKey)
- {
- this.primaryKey = primaryKey;
- }
-
@Override
public void setup(OperatorContext context)
{
@@ -172,6 +154,7 @@
public MemsqlOutputOperator()
{
+ super();
columnDataTypes = new ArrayList<Integer>();
getters = new ArrayList<Object>();
}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java b/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
index eab75ee..5c281a2 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/parser/AbstractCsvParser.java
@@ -245,21 +245,37 @@
String name;
FIELD_TYPE type;
+ /**
+ * Gets the name of the Field.
+ * @return name
+ */
public String getName()
{
return name;
}
+ /**
+ * Sets the name of the Field.
+ * @param name
+ */
public void setName(String name)
{
this.name = name;
}
+ /**
+ * Gets the type of the Field.
+ * @return type
+ */
public FIELD_TYPE getType()
{
return type;
}
+ /**
+ * Sets the type of the Field.
+ * @return type
+ */
public void setType(String type)
{
this.type = FIELD_TYPE.valueOf(type);
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index 34728b4..e75a0ab 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -23,11 +23,11 @@
import com.datatorrent.lib.helper.OperatorContextTestHelper;
import com.datatorrent.lib.testbench.CollectorTestSink;
import com.google.common.collect.Lists;
+import java.util.ArrayList;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import javax.annotation.Nonnull;
import java.util.List;
/**
@@ -35,8 +35,8 @@
*/
public class CassandraOperatorTest
{
- public static final String NODE = "127.0.0.1";
- public static final String KEYSPACE = "test";
+ public static final String NODE = "localhost";
+ public static final String KEYSPACE = "demo";
private static final String TABLE_NAME = "test_event_table";
private static String APP_ID = "CassandraOperatorTest";
@@ -92,34 +92,8 @@
}
}
- private static class TestOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<TestEvent>
+ private static class TestOutputOperator extends CassandraOutputOperator
{
- private static final String INSERT_STMT = "INSERT INTO " + KEYSPACE+"." +TABLE_NAME + " (ID) VALUES (?);";
-
- TestOutputOperator()
- {
- cleanTable();
- }
-
- @Nonnull
- @Override
- protected PreparedStatement getUpdateCommand()
- {
- try {
- return store.getSession().prepare(INSERT_STMT);
- }
- catch (DriverException e) {
- throw new RuntimeException("preparing", e);
- }
- }
-
- @Override
- protected Statement setStatementParameters(PreparedStatement statement, TestEvent tuple) throws DriverException
- {
- BoundStatement boundStatement = new BoundStatement(statement);
- Statement stmnt = boundStatement.bind(tuple.id);
- return stmnt;
- }
public long getNumOfEventsInStore()
{
@@ -206,6 +180,13 @@
TestOutputOperator outputOperator = new TestOutputOperator();
+ outputOperator.setTablename(TABLE_NAME);
+ ArrayList<String> columns = new ArrayList<String>();
+ columns.add("ID");
+ outputOperator.setColumns(columns);
+ ArrayList<String> expressions = new ArrayList<String>();
+ expressions.add("getID()");
+ outputOperator.setExpressions(expressions);
outputOperator.setStore(transactionalStore);
outputOperator.setup(context);
@@ -249,5 +230,49 @@
Assert.assertEquals("rows from db", 10, sink.collectedTuples.size());
}
+
+ public InnerObj innerObj = new InnerObj();
+
+ /**
+ * @return the innerObj
+ */
+ public InnerObj getInnerObj()
+ {
+ return innerObj;
+ }
+
+ /**
+ * @param innerObj the innerObj to set
+ */
+ public void setInnerObj(InnerObj innerObj)
+ {
+ this.innerObj = innerObj;
+ }
+
+ public class InnerObj
+ {
+ public InnerObj()
+ {
+ }
+
+ private int ID=11;
+
+ /**
+ * @return the int ID
+ */
+ public int getID()
+ {
+ return ID;
+ }
+
+ /**
+ * @param ID the intVal to set
+ */
+ public void setID(int ID)
+ {
+ this.ID = ID;
+ }
+
+ }
}