changes
diff --git a/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java
new file mode 100644
index 0000000..edf78b4
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java
@@ -0,0 +1,16 @@
+
+package com.datatorrent.contrib.accumulo;
+
+import org.apache.accumulo.core.data.Mutation;
+
+
+public class AccumuloOutputOperator extends AbstractAccumuloOutputOperator<Object>
+{
+
+ @Override
+ public Mutation operationMutation(Object t)
+ {
+ return null;
+ }
+
+}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
index 4c4e3c5..9bb5d38 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
@@ -64,6 +64,7 @@
@Override
public void processBatch(Collection<T> tuples)
{
+ System.out.println("in process Batch");
BatchStatement batchCommand = store.getBatchCommand();
for(T tuple: tuples)
{
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
index b9a3f69..05162a1 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -46,7 +46,7 @@
private ArrayList<DataType> columnDataTypes;
private ArrayList<String> expressions;
private transient ArrayList<Object> getters;
-
+ private boolean isfirstTuple;
/*
* An ArrayList of Java expressions that will yield the field value from the POJO.
* Each expression corresponds to one column in the Cassandra table.
@@ -91,6 +91,7 @@
public CassandraOutputOperator()
{
super();
+ isfirstTuple = true;
columnDataTypes = new ArrayList<DataType>();
getters = new ArrayList<Object>();
}
@@ -99,9 +100,10 @@
@Override
public void processTuple(Object tuple)
{
- if (getters.isEmpty()) {
+ if (isfirstTuple) {
processFirstTuple(tuple);
}
+ isfirstTuple = false;
super.processTuple(tuple);
}
@@ -109,8 +111,10 @@
{
Class<?> fqcn = tuple.getClass();
int size = columnDataTypes.size();
+ LOG.debug("size is {}",size);
for (int i = 0; i < size; i++) {
DataType type = columnDataTypes.get(i);
+ LOG.debug("type is {}",type.getClass());
String getterExpression = expressions.get(i);
if (type.equals(DataType.Name.ASCII) || type.equals(DataType.Name.TEXT) || type.equals(DataType.Name.VARCHAR)) {
GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression);
@@ -144,6 +148,10 @@
GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression);
getters.add(getObject);
}
+ else
+ {
+ throw new UnsupportedOperationException("this operation is not supported"+type);
+ }
}
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
index e75a0ab..aec3e71 100644
--- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
+++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -191,13 +191,13 @@
outputOperator.setup(context);
- List<TestEvent> events = Lists.newArrayList();
+ List<InnerObj> events = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
- events.add(new TestEvent(i));
+ events.add(new InnerObj(i));
}
outputOperator.beginWindow(0);
- for (TestEvent event : events) {
+ for (InnerObj event : events) {
outputOperator.input.process(event);
}
outputOperator.endWindow();
@@ -205,7 +205,7 @@
Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore());
}
- @Test
+ //@Test
public void TestCassandraInputOperator()
{
CassandraStore store = new CassandraStore();
@@ -257,6 +257,11 @@
private int ID=11;
+ private InnerObj(int i)
+ {
+ ID = i;
+ }
+
/**
* @return the int ID
*/
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
index ce05bbe..923a719 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
@@ -41,6 +41,7 @@
@Override
public void processTuple(T tuple)
{
+ System.out.println("in process tuple");
tuples.add(tuple);
}
@@ -54,6 +55,7 @@
/**
* Processes the whole batch at the end window and writes to the store.
*
+ * @param tuples
*/
public abstract void processBatch(Collection<T> tuples);
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
index c3cf5b7..48ff88e 100644
--- a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
@@ -56,6 +56,9 @@
@Override
public void process(T t)
{
+ System.out.println("in process");
+ System.out.println("committedWindowId is " + committedWindowId);
+ System.out.println("currentWindowId is " + currentWindowId);
if (committedWindowId < currentWindowId) {
processTuple(t);
}
@@ -91,6 +94,7 @@
appId = context.getValue(DAG.APPLICATION_ID);
operatorId = context.getId();
committedWindowId = store.getCommittedWindowId(appId, operatorId);
+ System.out.println("committedWindowId is "+ committedWindowId);
}
catch (IOException ex) {
throw new RuntimeException(ex);