Expand README and improve formatting.
diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
index 5254b4c..4c3f10d 100644
--- a/examples/exactly-once/README.md
+++ b/examples/exactly-once/README.md
@@ -1,8 +1,27 @@
# Examples for end-to-end exactly-once
+The examples are a variation of word count to illustrate end-to-end exactly-once processing
+by incorporating the external system integration aspect, which needs to be taken into account when
+developing real-world pipelines:
+
+* Read from Kafka source
+* Windowed count aggregation that emits incremental aggregates
+* Sink that maintains totals accumulating the incremental aggregates (shown for JDBC and file output)
+
+The examples combine the 3 properties that are required for end-to-end exactly-once results:
+
+1. At-least-once processing that guarantees no loss of data
+2. Idempotency in the DAG (Kafka input operator and repeatable/deterministic streaming windows)
+3. Consistent state between DAG and external system, enabled by the output operators.
+
+The test cases show how the applications can be configured to run in embedded mode (including Kafka).
+
## Read from Kafka, write to JDBC
-This application shows exactly-once output to JDBC through transactions:
+Shows exactly-once output to JDBC through transactions. The JDBC output operator
+keeps track of the streaming window along with the count to avoid duplicate writes on replay
+during recovery. This is an example for continuously updating results in the database,
+enabled by the transactions.
[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
@@ -10,7 +29,9 @@
## Read from Kafka, write to Files
-This application shows exactly-once output to HDFS through atomic file operation:
+This application shows exactly-once output to files through atomic file operation. In contrast to the
+JDBC example, output can only occur once the final count is computed. This implies batching at the sink,
+leading to high latency.
[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
diff --git a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
index 33ae9dc..6982833 100644
--- a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -48,7 +48,8 @@
@Override
public void populateDAG(DAG dag, Configuration conf)
{
- KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+ KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
+ new KafkaSinglePortStringInputOperator());
kafkaInput.setWindowDataManager(new FSWindowDataManager());
UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
@@ -58,7 +59,8 @@
dag.addStream("counts", count.counts, store.input, cons.input);
}
- public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+ public static class CountStoreOperator
+ extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
{
public static final String SQL =
"MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
@@ -73,13 +75,18 @@
}
@Override
- protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+ protected void setStatementParameters(PreparedStatement statement,
+ KeyValPair<String, Integer> tuple) throws SQLException
{
statement.setString(1, tuple.getKey());
statement.setInt(2, tuple.getValue());
}
}
+ /**
+ * Extension of {@link UniqueCounter} that emits individual key/value pairs instead
+ * of map with all modified values.
+ */
public static class UniqueCounterFlat extends UniqueCounter<String>
{
public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
index 62bfb74..5457ec5 100644
--- a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -76,19 +76,20 @@
Connection con = DriverManager.getConnection(DB_URL);
Statement stmt = con.createStatement();
- String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS "
+ + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ ")";
stmt.executeUpdate(createMetaTable);
String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
stmt.executeUpdate(createTable);
-
}
@Test