changes
diff --git a/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java new file mode 100644 index 0000000..edf78b4 --- /dev/null +++ b/contrib/src/main/java/com/datatorrent/contrib/accumulo/AccumuloOutputOperator.java
@@ -0,0 +1,16 @@ + +package com.datatorrent.contrib.accumulo; + +import org.apache.accumulo.core.data.Mutation; + + +public class AccumuloOutputOperator extends AbstractAccumuloOutputOperator<Object> +{ + + @Override + public Mutation operationMutation(Object t) + { + return null; + } + +}
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java index 4c4e3c5..9bb5d38 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/AbstractCassandraTransactionableOutputOperator.java
@@ -64,6 +64,7 @@ @Override public void processBatch(Collection<T> tuples) { + System.out.println("in process Batch"); BatchStatement batchCommand = store.getBatchCommand(); for(T tuple: tuples) {
diff --git a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java index b9a3f69..05162a1 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java +++ b/contrib/src/main/java/com/datatorrent/contrib/cassandra/CassandraOutputOperator.java
@@ -46,7 +46,7 @@ private ArrayList<DataType> columnDataTypes; private ArrayList<String> expressions; private transient ArrayList<Object> getters; - + private boolean isfirstTuple; /* * An ArrayList of Java expressions that will yield the field value from the POJO. * Each expression corresponds to one column in the Cassandra table. @@ -91,6 +91,7 @@ public CassandraOutputOperator() { super(); + isfirstTuple = true; columnDataTypes = new ArrayList<DataType>(); getters = new ArrayList<Object>(); } @@ -99,9 +100,10 @@ @Override public void processTuple(Object tuple) { - if (getters.isEmpty()) { + if (isfirstTuple) { processFirstTuple(tuple); } + isfirstTuple = false; super.processTuple(tuple); } @@ -109,8 +111,10 @@ { Class<?> fqcn = tuple.getClass(); int size = columnDataTypes.size(); + LOG.debug("size is {}",size); for (int i = 0; i < size; i++) { DataType type = columnDataTypes.get(i); + LOG.debug("type is {}",type.getClass()); String getterExpression = expressions.get(i); if (type.equals(DataType.Name.ASCII) || type.equals(DataType.Name.TEXT) || type.equals(DataType.Name.VARCHAR)) { GetterString getVarchar = PojoUtils.createGetterString(fqcn, getterExpression); @@ -144,6 +148,10 @@ GetterObject getObject = PojoUtils.createGetterObject(fqcn, getterExpression); getters.add(getObject); } + else + { + throw new UnsupportedOperationException("this operation is not supported"+type); + } }
diff --git a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java index e75a0ab..aec3e71 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/cassandra/CassandraOperatorTest.java
@@ -191,13 +191,13 @@ outputOperator.setup(context); - List<TestEvent> events = Lists.newArrayList(); + List<InnerObj> events = Lists.newArrayList(); for (int i = 0; i < 10; i++) { - events.add(new TestEvent(i)); + events.add(new InnerObj(i)); } outputOperator.beginWindow(0); - for (TestEvent event : events) { + for (InnerObj event : events) { outputOperator.input.process(event); } outputOperator.endWindow(); @@ -205,7 +205,7 @@ Assert.assertEquals("rows in db", 10, outputOperator.getNumOfEventsInStore()); } - @Test + //@Test public void TestCassandraInputOperator() { CassandraStore store = new CassandraStore(); @@ -257,6 +257,11 @@ private int ID=11; + private InnerObj(int i) + { + ID = i; + } + /** * @return the int ID */
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java index ce05bbe..923a719 100644 --- a/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/AbstractBatchTransactionableStoreOutputOperator.java
@@ -41,6 +41,7 @@ @Override public void processTuple(T tuple) { + System.out.println("in process tuple"); tuples.add(tuple); } @@ -54,6 +55,7 @@ /** * Processes the whole batch at the end window and writes to the store. * + * @param tuples */ public abstract void processBatch(Collection<T> tuples);
diff --git a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java index c3cf5b7..48ff88e 100644 --- a/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java +++ b/library/src/main/java/com/datatorrent/lib/db/AbstractTransactionableStoreOutputOperator.java
@@ -56,6 +56,9 @@ @Override public void process(T t) { + System.out.println("in process"); + System.out.println("committedWindowId is " + committedWindowId); + System.out.println("currentWindowId is " + currentWindowId); if (committedWindowId < currentWindowId) { processTuple(t); } @@ -91,6 +94,7 @@ appId = context.getValue(DAG.APPLICATION_ID); operatorId = context.getId(); committedWindowId = store.getCommittedWindowId(appId, operatorId); + System.out.println("committedWindowId is "+ committedWindowId); } catch (IOException ex) { throw new RuntimeException(ex);