Use algo/UniqueCounter to reduce code
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
index 8050d67..7700d68 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
@@ -16,6 +16,7 @@
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.algo.UniqueCounter;
 import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
 import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
 import com.datatorrent.lib.io.ConsoleOutputOperator;
@@ -32,12 +33,12 @@
   {
     KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
     kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
-    UniqueCounter<String> count = dag.addOperator("count", new UniqueCounter<String>());
+    UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
     store.setStore(new JdbcTransactionalStore());
     ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
     dag.addStream("words", kafkaInput.outputPort, count.data);
-    dag.addStream("counts", count.count, store.input, cons.input);
+    dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
   public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
@@ -62,42 +63,18 @@
     }
   }
 
-  public static class UniqueCounter<K> extends BaseUniqueKeyCounter<K>
+  public static class UniqueCounterFlat extends UniqueCounter<String>
   {
-    /**
-     * The input port which receives incoming tuples.
-     */
-    public final transient DefaultInputPort<K> data = new DefaultInputPort<K>()
-    {
-      /**
-       * Reference counts tuples
-       */
-      @Override
-      public void process(K tuple)
-      {
-        processTuple(tuple);
-      }
-
-    };
-
-    public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>()
-    {
-      @Override
-      public Unifier<KeyValPair<K, Integer>> getUnifier()
-      {
-        throw new UnsupportedOperationException("not partitionable");
-      }
-    };
+    public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
 
     @Override
     public void endWindow()
     {
-      for (Map.Entry<K, MutableInt> e: map.entrySet()) {
-        count.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+      for (Map.Entry<String, MutableInt> e: map.entrySet()) {
+        counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
       }
       map.clear();
     }
-
   }
 
 }