Use algo/UniqueCounter to reduce code
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
index 8050d67..7700d68 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -16,6 +16,7 @@
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -32,12 +33,12 @@
{
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
- UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+ UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
store.setStore(new JdbcTransactionalStore());
ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("words", kafkaInput.outputPort, count.data);
- dag.addStream("counts", count.count, store.input, cons.input);
+ dag.addStream("counts", count.counts, store.input, cons.input);
}
public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
@@ -62,42 +63,18 @@
}
}
- public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+ public static class UniqueCounterFlat extends UniqueCounter<String>
{
- /**
- * The input port which receives incoming tuples.
- */
- public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
- {
- /**
- * Reference counts tuples
- */
- @Override
- public void process(K tuple)
- {
- processTuple(tuple);
- }
-
- };
-
- public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
- {
- @Override
- public Unifier<KeyValPair<K, Integer>> getUnifier()
- {
- throw new UnsupportedOperationException("not partitionable");
- }
- };
+ public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
@Override
public void endWindow()
{
- for (Map.Entry<K, MutableInt> e: map.entrySet()) {
- count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+ for (Map.Entry<String, MutableInt> e: map.entrySet()) {
+ counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
}
map.clear();
}
-
}
}