changes to operator.
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
index 31bf428..25da032 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -19,7 +19,6 @@
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Statement;
import com.datastax.driver.core.exceptions.DriverException;
-import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.lib.util.PojoUtils;
import com.datatorrent.lib.util.PojoUtils.GetterBoolean;
import com.datatorrent.lib.util.PojoUtils.GetterDouble;
@@ -28,6 +27,7 @@
import com.datatorrent.lib.util.PojoUtils.GetterLong;
import com.datatorrent.lib.util.PojoUtils.GetterObject;
import com.datatorrent.lib.util.PojoUtils.GetterString;
+import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Date;
import java.util.UUID;
@@ -39,8 +39,9 @@
* <p>
* CassandraOutputOperator class.</p>
* A Generic implementation of AbstractCassandraTransactionableOutputOperatorPS which takes in any POJO.
- *
- * @since 1.0.3
+ * @displayName Cassandra Output Operator
+ * @category Output
+ * @tags output operator
*/
public class CassandraOutputOperator extends AbstractCassandraTransactionableOutputOperatorPS<Object>
{
@@ -117,7 +118,7 @@
int size = columnDataTypes.size();
for (int i = 0; i < size; i++) {
DataType type = columnDataTypes.get(i);
- LOG.debug("type is {}",type.getName());
+ LOG.debug("type is {}",type);
String getterExpression = PojoUtils.getSingleFieldExpression(fqcn, expressions.get(i));
if (type.equals(DataType.ascii()) || type.equals(DataType.text()) || type.equals(DataType.varchar())) {
GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
@@ -147,13 +148,10 @@
GetterDouble getDouble = PojoUtils.createGetterDouble(fqcn, getterExpression);
getters.add(getDouble);
}
- else if (type.equals(DataType.timestamp())) {
- GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
- getters.add(getObject);
- }
else
{
- throw new UnsupportedOperationException("this type is not supported "+type);
+ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
+ getters.add(getObject);
}
}
@@ -175,8 +173,6 @@
values.append(",").append("?");
}
}
- LOG.debug("queryfields are", queryfields.toString());
- LOG.debug("values are ",values.toString());
String statement
= "INSERT INTO " + store.keyspace + "."
+ tablename
@@ -194,59 +190,67 @@
processFirstTuple(tuple);
}
BoundStatement boundStmnt = new BoundStatement(updateCommand);
+ //BatchStatement batchStmt = new BatchStatement();
int size = columnDataTypes.size();
- Object getter = new Object();
- UUID id = (UUID)(((GetterObject)getters.get(0)).get(tuple));;
- for (int i = 1; i < size; i++) {
+ //Object getter = new Object();
+ // Object[] getter= new Object[size];
+ // UUID id = (UUID)(((GetterObject)getters.get(0)).get(tuple));
+ // Object id = null;
+ for (int i = 0; i < size; i++) {
DataType type = columnDataTypes.get(i);
- LOG.debug("type before switch is {}",type.getName());
+ LOG.debug("name of type is {}",type.getName());
switch (type.getName()) {
case UUID:
- id = (UUID)(((GetterObject)getters.get(i)).get(tuple));
+ UUID id = (UUID)(((GetterObject)getters.get(i)).get(tuple));
+ boundStmnt.setUUID(i, id);
break;
case ASCII:
- getter = ((GetterString)getters.get(i)).get(tuple);
+ String ascii= ((GetterString)getters.get(i)).get(tuple);
+ boundStmnt.setString(i, ascii);
break;
case VARCHAR:
- getter = ((GetterString)getters.get(i)).get(tuple);
+ String varchar = ((GetterString)getters.get(i)).get(tuple);
+ boundStmnt.setString(i, varchar);
break;
case TEXT:
- getter = ((GetterString)getters.get(i)).get(tuple);
+ String text= ((GetterString)getters.get(i)).get(tuple);
+ boundStmnt.setString(i, text);
break;
case BOOLEAN:
- getter = ((GetterBoolean)getters.get(i)).get(tuple);
+ Boolean bool = ((GetterBoolean)getters.get(i)).get(tuple);
+ boundStmnt.setBool(i, bool);
break;
case INT:
- getter = ((GetterInt)getters.get(i)).get(tuple);
+ Integer intValue = ((GetterInt)getters.get(i)).get(tuple);
+ boundStmnt.setInt(i, intValue);
break;
case BIGINT:
- getter = ((GetterLong)getters.get(i)).get(tuple);
+ Long longValue = ((GetterLong)getters.get(i)).get(tuple);
+ boundStmnt.setLong(i, longValue);
break;
case COUNTER:
- getter = ((GetterLong)getters.get(i)).get(tuple);
+ Long counter = ((GetterLong)getters.get(i)).get(tuple);
+ boundStmnt.setLong(i, counter);
break;
case FLOAT:
- getter = ((GetterFloat)getters.get(i)).get(tuple);
+ Float floatValue = ((GetterFloat)getters.get(i)).get(tuple);
+ boundStmnt.setFloat(i, floatValue);
break;
case DOUBLE:
- getter = ((GetterDouble)getters.get(i)).get(tuple);
+ LOG.debug("double value");
+ Double doubleValue = ((GetterDouble)getters.get(i)).get(tuple);
+ boundStmnt.setDouble(i, doubleValue);
break;
- case TIMESTAMP:
- getter = (Date)((GetterObject)getters.get(i)).get(tuple);
- break;
- case CUSTOM:
- getter = ((GetterObject)getters.get(i)).get(tuple);
- break;
- default:
- getter = (((GetterObject)getters.get(i)).get(tuple));
+ case DECIMAL:
+ BigDecimal decimal = (BigDecimal)((GetterObject)getters.get(i)).get(tuple);
+ boundStmnt.setDecimal(i, decimal);
break;
}
- /*if(i==0)
- {
- id = getter;
- }*/
- boundStmnt.bind(id,getter);
}
+ //batchStmt.add(updateCommand.bind(id, getter, title1, body1));
+//batch.add(ps2.bind(uid, mid2));
+ // boundStmnt.bind(id,getter1[0],getter1[1]);
+
return boundStmnt;
}
@@ -254,3 +258,4 @@
private static transient final Logger LOG = LoggerFactory.getLogger(CassandraOutputOperator.class);
}
+