Expand README and improve formatting.
diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
index 5254b4c..4c3f10d 100644
--- a/examples/exactly-once/README.md
+++ b/examples/exactly-once/README.md
@@ -1,8 +1,27 @@
 # Examples for end-to-end exactly-once
 
+The examples are a variation of word count to illustrate end-to-end exactly-once processing
+by incorporating the external system integration aspect, which needs to be taken into account when
+developing real-world pipelines:
+
+* Read from Kafka source
+* Windowed count aggregation that emits incremental aggregates
+* Sink that maintains totals accumulating the incremental aggregates (shown for JDBC and file output)
+
+The examples combine the 3 properties that are required for end-to-end exactly-once results:
+
+1. At-least-once processing that guarantees no loss of data
+2. Idempotency in the DAG (Kafka input operator and repeatable/deterministic streaming windows)
+3. Consistent state between DAG and external system, enabled by the output operators.
+
+The test cases show how the applications can be configured to run in embedded mode (including Kafka).
+
 ## Read from Kafka, write to JDBC
 
-This application shows exactly-once output to JDBC through transactions:
+Shows exactly-once output to JDBC through transactions. The JDBC output operator
+keeps track of the streaming window along with the count to avoid duplicate writes on replay
+during recovery. This is an example for continuously updating results in the database,
+enabled by the transactions.
 
 [Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
 
@@ -10,7 +29,9 @@
 
 ## Read from Kafka, write to Files
 
-This application shows exactly-once output to HDFS through atomic file operation:
+This application shows exactly-once output to files through atomic file operation. In contrast to the
+JDBC example, output can only occur once the final count is computed. This implies batching at the sink,
+leading to high latency.
 
 [Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
 
diff --git a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 33ae9dc..6982833 100644
--- a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -48,7 +48,8 @@
   @Override
   public void populateDAG(DAG dag, Configuration conf)
   {
-    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+    KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
+        new KafkaSinglePortStringInputOperator());
     kafkaInput.setWindowDataManager(new FSWindowDataManager());
     UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
     CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
@@ -58,7 +59,8 @@
     dag.addStream("counts", count.counts, store.input, cons.input);
   }
 
-  public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+  public static class CountStoreOperator
+      extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
   {
     public static final String SQL =
         "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
@@ -73,13 +75,18 @@
     }
 
     @Override
-    protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+    protected void setStatementParameters(PreparedStatement statement,
+        KeyValPair<String, Integer> tuple) throws SQLException
     {
       statement.setString(1, tuple.getKey());
       statement.setInt(2, tuple.getValue());
     }
   }
 
+  /**
+   * Extension of {@link UniqueCounter} that emits individual key/value pairs instead
+   * of map with all modified values.
+   */
   public static class UniqueCounterFlat extends UniqueCounter<String>
   {
     public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
index 62bfb74..5457ec5 100644
--- a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -76,19 +76,20 @@
     Connection con = DriverManager.getConnection(DB_URL);
     Statement stmt = con.createStatement();
 
-    String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+    String createMetaTable = "CREATE TABLE IF NOT EXISTS "
+        + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
         + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
         + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
         + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
         + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
-        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+        + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+        + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
         + ")";
     stmt.executeUpdate(createMetaTable);
 
     String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
         + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
     stmt.executeUpdate(createTable);
-
   }
 
   @Test