APEXMALHAR-2233 Migrate exactly-once examples.
diff --git a/examples/exactly-once/README.md b/examples/exactly-once/README.md
new file mode 100644
index 0000000..5254b4c
--- /dev/null
+++ b/examples/exactly-once/README.md
@@ -0,0 +1,17 @@
+# Examples for end-to-end exactly-once
+
+## Read from Kafka, write to JDBC
+
+This application shows exactly-once output to JDBC through transactions:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java)
+
+## Read from Kafka, write to Files
+
+This application shows exactly-once output to HDFS through atomic file operation:
+
+[Application](src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java)
+
+[Test](src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java)
diff --git a/examples/exactly-once/pom.xml b/examples/exactly-once/pom.xml
new file mode 100644
index 0000000..c198f48
--- /dev/null
+++ b/examples/exactly-once/pom.xml
@@ -0,0 +1,62 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-examples</artifactId>
+ <version>3.8.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>malhar-examples-exactly-once</artifactId>
+ <packaging>jar</packaging>
+
+ <name>Apache Apex Malhar Exactly-Once Examples</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-library</artifactId>
+ <version>${project.version}</version>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.apex</groupId>
+ <artifactId>malhar-kafka</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hsqldb</groupId>
+ <artifactId>hsqldb</artifactId>
+ <version>2.3.4</version>
+ </dependency>
+ </dependencies>
+
+</project>
diff --git a/examples/exactly-once/src/assemble/appPackage.xml b/examples/exactly-once/src/assemble/appPackage.xml
index 7ad071c..a870807 100644
--- a/examples/exactly-once/src/assemble/appPackage.xml
+++ b/examples/exactly-once/src/assemble/appPackage.xml
@@ -1,3 +1,23 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java b/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
deleted file mode 100644
index 7700d68..0000000
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/Application.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.sql.PreparedStatement;
-import java.sql.SQLException;
-import java.util.Map;
-
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.hadoop.conf.Configuration;
-
-import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.algo.UniqueCounter;
-import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
-import com.datatorrent.lib.util.BaseUniqueKeyCounter;
-import com.datatorrent.lib.util.KeyValPair;
-
-@ApplicationAnnotation(name="ExactlyOnceExampleApplication")
-public class Application implements StreamingApplication
-{
-
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
- kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
- UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
- CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
- store.setStore(new JdbcTransactionalStore());
- ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
- dag.addStream("words", kafkaInput.outputPort, count.data);
- dag.addStream("counts", count.counts, store.input, cons.input);
- }
-
- public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
- {
- public static final String SQL =
- "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
- + " ON (words.word=I.word)"
- + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount"
- + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)";
-
- @Override
- protected String getUpdateCommand()
- {
- return SQL;
- }
-
- @Override
- protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
- {
- statement.setString(1, tuple.getKey());
- statement.setInt(2, tuple.getValue());
- }
- }
-
- public static class UniqueCounterFlat extends UniqueCounter<String>
- {
- public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
-
- @Override
- public void endWindow()
- {
- for (Map.Entry<String, MutableInt> e: map.entrySet()) {
- counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
- }
- map.clear();
- }
- }
-
-}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java b/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
deleted file mode 100644
index eed344b..0000000
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/RandomNumberGenerator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * This is a simple operator that emits random number.
- */
-public class RandomNumberGenerator extends BaseOperator implements InputOperator
-{
- private int numTuples = 100;
- private transient int count = 0;
-
- public final transient DefaultOutputPort<Double> out = new DefaultOutputPort<Double>();
-
- @Override
- public void beginWindow(long windowId)
- {
- count = 0;
- }
-
- @Override
- public void emitTuples()
- {
- if (count++ < numTuples) {
- out.emit(Math.random());
- }
- }
-
- public int getNumTuples()
- {
- return numTuples;
- }
-
- /**
- * Sets the number of tuples to be emitted every window.
- * @param numTuples number of tuples
- */
- public void setNumTuples(int numTuples)
- {
- this.numTuples = numTuples;
- }
-}
diff --git a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
similarity index 67%
rename from examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
rename to examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
index 55f8cc6..9aa2870 100644
--- a/examples/exactly-once/src/main/java/com/example/myapexapp/AtomicFileOutputApp.java
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputApp.java
@@ -1,32 +1,47 @@
/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*/
-package com.example.myapexapp;
+package org.apache.apex.examples.exactlyonce;
+import org.apache.apex.examples.exactlyonce.ExactlyOnceJdbcOutputApp.KafkaSinglePortStringInputOperator;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
-import com.datatorrent.contrib.kafka.KafkaSinglePortStringInputOperator;
import com.datatorrent.lib.io.ConsoleOutputOperator;
-import com.datatorrent.lib.io.IdempotentStorageManager;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.datatorrent.lib.util.KeyValPair;
-@ApplicationAnnotation(name = "AtomicFileOutput")
-public class AtomicFileOutputApp implements StreamingApplication
+@ApplicationAnnotation(name = "ExactlyOnceFileOutput")
+public class ExactlyOnceFileOutputApp implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput",
new KafkaSinglePortStringInputOperator());
- kafkaInput.setIdempotentStorageManager(new IdempotentStorageManager.FSIdempotentStorageManager());
+ kafkaInput.setWindowDataManager(new FSWindowDataManager());
- Application.UniqueCounterFlat count = dag.addOperator("count", new Application.UniqueCounterFlat());
+ ExactlyOnceJdbcOutputApp.UniqueCounterFlat count = dag.addOperator("count",
+ new ExactlyOnceJdbcOutputApp.UniqueCounterFlat());
FileWriter fileWriter = dag.addOperator("fileWriter", new FileWriter());
diff --git a/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
new file mode 100644
index 0000000..33ae9dc
--- /dev/null
+++ b/examples/exactly-once/src/main/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputApp.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.apex.malhar.kafka.AbstractKafkaConsumer;
+import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
+import org.apache.apex.malhar.kafka.KafkaConsumer09;
+import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.algo.UniqueCounter;
+import com.datatorrent.lib.db.jdbc.AbstractJdbcTransactionableOutputOperator;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+import com.datatorrent.lib.io.ConsoleOutputOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
+@ApplicationAnnotation(name = "ExactlyOnceJbdcOutput")
+public class ExactlyOnceJdbcOutputApp implements StreamingApplication
+{
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ KafkaSinglePortStringInputOperator kafkaInput = dag.addOperator("kafkaInput", new KafkaSinglePortStringInputOperator());
+ kafkaInput.setWindowDataManager(new FSWindowDataManager());
+ UniqueCounterFlat count = dag.addOperator("count", new UniqueCounterFlat());
+ CountStoreOperator store = dag.addOperator("store", new CountStoreOperator());
+ store.setStore(new JdbcTransactionalStore());
+ ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
+ dag.addStream("words", kafkaInput.outputPort, count.data);
+ dag.addStream("counts", count.counts, store.input, cons.input);
+ }
+
+ public static class CountStoreOperator extends AbstractJdbcTransactionableOutputOperator<KeyValPair<String, Integer>>
+ {
+ public static final String SQL =
+ "MERGE INTO words USING (VALUES ?, ?) I (word, wcount)"
+ + " ON (words.word=I.word)"
+ + " WHEN MATCHED THEN UPDATE SET words.wcount = words.wcount + I.wcount"
+ + " WHEN NOT MATCHED THEN INSERT (word, wcount) VALUES (I.word, I.wcount)";
+
+ @Override
+ protected String getUpdateCommand()
+ {
+ return SQL;
+ }
+
+ @Override
+ protected void setStatementParameters(PreparedStatement statement, KeyValPair<String, Integer> tuple) throws SQLException
+ {
+ statement.setString(1, tuple.getKey());
+ statement.setInt(2, tuple.getValue());
+ }
+ }
+
+ public static class UniqueCounterFlat extends UniqueCounter<String>
+ {
+ public final transient DefaultOutputPort<KeyValPair<String, Integer>> counts = new DefaultOutputPort<>();
+
+ @Override
+ public void endWindow()
+ {
+ for (Map.Entry<String, MutableInt> e: map.entrySet()) {
+ counts.emit(new KeyValPair<>(e.getKey(), e.getValue().toInteger()));
+ }
+ map.clear();
+ }
+ }
+
+ public static class KafkaSinglePortStringInputOperator extends AbstractKafkaInputOperator
+ {
+ public final transient DefaultOutputPort<String> outputPort = new DefaultOutputPort<>();
+
+ @Override
+ public AbstractKafkaConsumer createConsumer(Properties properties)
+ {
+ return new KafkaConsumer09(properties);
+ }
+
+ @Override
+ protected void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
+ {
+ outputPort.emit(new String(message.value()));
+ }
+ }
+
+}
diff --git a/examples/exactly-once/src/main/resources/META-INF/properties.xml b/examples/exactly-once/src/main/resources/META-INF/properties.xml
index 876c39a..70f8812 100644
--- a/examples/exactly-once/src/main/resources/META-INF/properties.xml
+++ b/examples/exactly-once/src/main/resources/META-INF/properties.xml
@@ -1,24 +1,52 @@
<?xml version="1.0"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<configuration>
- <!--
+
<property>
- <name>dt.application.{appName}.operator.{opName}.prop.{propName}</name>
- <value>some-default-value (if value is not specified, it is required from the user or custom config when launching)</value>
- </property>
- -->
- <!-- memory assigned to app master
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
- -->
- <property>
- <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
- <value>1000</value>
+ <name>apex.operator.kafkaInput.prop.initialPartitionCount</name>
+ <value>1</value>
</property>
<property>
- <name>dt.application.MyFirstApplication.operator.console.prop.stringFormat</name>
- <value>hello world: %s</value>
+ <name>apex.operator.kafkaInput.prop.topics</name>
+ <value>exactly-once-example</value>
</property>
+ <property>
+ <name>apex.operator.kafkaInput.prop.clusters</name>
+ <value>localhost:9092</value>
+ </property>
+
+ <property>
+ <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseDriver</name>
+ <value>org.hsqldb.jdbcDriver</value>
+ </property>
+ <property>
+ <name>apex.application.ExactlyOnceJbdcOutput.operator.store.prop.store.databaseUrl</name>
+ <value>jdbc:hsqldb:mem:test;sql.syntax_mys=true</value>
+ </property>
+
+ <property>
+ <name>apex.application.ExactlyOnceFileOutput.operator.fileWriter.prop.filePath</name>
+ <value>target/atomicFileOutput</value>
+ </property>
+
</configuration>
diff --git a/examples/exactly-once/src/site/conf/my-app-conf1.xml b/examples/exactly-once/src/site/conf/my-app-conf1.xml
deleted file mode 100644
index ccb2b66..0000000
--- a/examples/exactly-once/src/site/conf/my-app-conf1.xml
+++ /dev/null
@@ -1,11 +0,0 @@
-<?xml version="1.0" encoding="UTF-8" standalone="no"?>
-<configuration>
- <property>
- <name>dt.attr.MASTER_MEMORY_MB</name>
- <value>1024</value>
- </property>
- <property>
- <name>dt.application.MyFirstApplication.operator.randomGenerator.prop.numTuples</name>
- <value>1000</value>
- </property>
-</configuration>
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
deleted file mode 100644
index c5aa69c..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/ApplicationTest.java
+++ /dev/null
@@ -1,124 +0,0 @@
-/**
- * Put your copyright and license info here.
- */
-package com.example.myapexapp;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.ResultSet;
-import java.sql.Statement;
-import java.util.HashSet;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
-import com.example.myapexapp.Application;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-/**
- * Test the application in local mode.
- */
-public class ApplicationTest
-{
- private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String KAFKA_TOPIC = "exactly-once-test";
- private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
- private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
- private static final String TABLE_NAME = "WORDS";
-
- @Before
- public void beforeTest() throws Exception {
- kafkaLauncher.baseDir = "target/" + this.getClass().getName();
- FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
- kafkaLauncher.startZookeeper();
- kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(0, KAFKA_TOPIC);
-
- // setup hsqldb
- Class.forName(DB_DRIVER).newInstance();
-
- Connection con = DriverManager.getConnection(DB_URL);
- Statement stmt = con.createStatement();
-
- String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
- + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
- + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
- + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
- + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
- + ")";
- stmt.executeUpdate(createMetaTable);
-
- String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
- + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
- stmt.executeUpdate(createTable);
-
- }
-
- @After
- public void afterTest() {
- kafkaLauncher.stopKafkaServer();
- kafkaLauncher.stopZookeeper();
- }
-
- @Test
- public void testApplication() throws Exception {
- try {
- // produce some test data
- KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
- String[] words = "count the words from kafka and store them in the db".split("\\s+");
- p.setMessages(Lists.newArrayList(words));
- new Thread(p).start();
-
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
- conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
- conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
- conf.set("dt.operator.store.prop.store.databaseDriver", DB_DRIVER);
- conf.set("dt.operator.store.prop.store.databaseUrl", DB_URL);
-
- lma.prepareDAG(new Application(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync(); // test will terminate after results are available
-
- HashSet<String> wordsSet = Sets.newHashSet(words);
- Connection con = DriverManager.getConnection(DB_URL);
- Statement stmt = con.createStatement();
- int rowCount = 0;
- long timeout = System.currentTimeMillis() + 30000; // 30s timeout
- while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
- Thread.sleep(1000);
- String countQuery = "SELECT count(*) from " + TABLE_NAME;
- ResultSet resultSet = stmt.executeQuery(countQuery);
- resultSet.next();
- rowCount = resultSet.getInt(1);
- resultSet.close();
- LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
- }
- Assert.assertEquals("number of words", wordsSet.size(), rowCount);
-
- lc.shutdown();
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
diff --git a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java b/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
deleted file mode 100644
index b539394..0000000
--- a/examples/exactly-once/src/test/java/com/example/myapexapp/AtomicFileOutputAppTest.java
+++ /dev/null
@@ -1,93 +0,0 @@
-package com.example.myapexapp;
-
-import java.io.File;
-
-import javax.validation.ConstraintViolationException;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.contrib.kafka.KafkaOperatorTestBase;
-import com.datatorrent.contrib.kafka.KafkaTestProducer;
-
-/**
- * Copyright (c) 2015 DataTorrent, Inc.
- * All rights reserved.
- */
-public class AtomicFileOutputAppTest
-{
- private final KafkaOperatorTestBase kafkaLauncher = new KafkaOperatorTestBase();
- private static final Logger LOG = LoggerFactory.getLogger(ApplicationTest.class);
- private static final String KAFKA_TOPIC = "exactly-once-test";
- private static final String TARGET_DIR = "target/atomicFileOutput";
-
- @Before
- public void beforeTest() throws Exception {
- kafkaLauncher.baseDir = "target/" + this.getClass().getName();
- FileUtils.deleteDirectory(new File(kafkaLauncher.baseDir));
- kafkaLauncher.startZookeeper();
- kafkaLauncher.startKafkaServer();
- kafkaLauncher.createTopic(0, KAFKA_TOPIC);
- }
-
- @After
- public void afterTest() {
- kafkaLauncher.stopKafkaServer();
- kafkaLauncher.stopZookeeper();
- }
-
- @Test
- public void testApplication() throws Exception {
- try {
-
- File targetDir = new File(TARGET_DIR);
- FileUtils.deleteDirectory(targetDir);
- FileUtils.forceMkdir(targetDir);
-
- // produce some test data
- KafkaTestProducer p = new KafkaTestProducer(KAFKA_TOPIC);
- String[] words = "count the words from kafka and store them in the db".split("\\s+");
- p.setMessages(Lists.newArrayList(words));
- new Thread(p).start();
-
- LocalMode lma = LocalMode.newInstance();
- Configuration conf = new Configuration(false);
- conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
- conf.set("dt.operator.kafkaInput.prop.topic", KAFKA_TOPIC);
- conf.set("dt.operator.kafkaInput.prop.zookeeper", "localhost:" + KafkaOperatorTestBase.TEST_ZOOKEEPER_PORT[0]);
- conf.set("dt.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
- conf.set("dt.operator.fileWriter.prop.filePath", TARGET_DIR);
-
- lma.prepareDAG(new AtomicFileOutputApp(), conf);
- LocalMode.Controller lc = lma.getController();
- lc.runAsync(); // test will terminate after results are available
-
- long timeout = System.currentTimeMillis() + 60000; // 60s timeout
-
- File outputFile = new File(TARGET_DIR, AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX);
- while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
- Thread.sleep(1000);
- LOG.debug("Waiting for {}", outputFile);
- }
-
- Assert.assertTrue("output file exists " + AtomicFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
- outputFile.isFile());
-
- lc.shutdown();
-
- } catch (ConstraintViolationException e) {
- Assert.fail("constraint violations: " + e.getConstraintViolations());
- }
- }
-
-}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
new file mode 100644
index 0000000..91a3d72
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceFileOutputAppTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.io.File;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.datatorrent.api.Attribute;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceFileOutputAppTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+ private static final String TARGET_DIR = "target/atomicFileOutput";
+
+ private final int brokerPort = NetUtils.getFreeSocketPort();
+ private final int zkPort = NetUtils.getFreeSocketPort();
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ {
+ // required to avoid 50 partitions auto creation
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ File targetDir = new File(TARGET_DIR);
+ FileUtils.deleteDirectory(targetDir);
+ FileUtils.forceMkdir(targetDir);
+
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ String topicName = "testTopic";
+ // topic creation is async and the producer may also auto-create it
+ ku.createTopic(topicName, 1);
+
+ // produce test data
+ String[] words = "count count the words from kafka and store them in a file".split("\\s+");
+ for (String word : words) {
+ ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+ }
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+ conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+ conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "2"); // consume one word per window
+ conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+ conf.set("apex.operator.fileWriter.prop.filePath", TARGET_DIR);
+
+ EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+ AppHandle appHandle = launcher.launchApp(new ExactlyOnceFileOutputApp(), conf, launchAttributes);
+
+ long timeout = System.currentTimeMillis() + 60000; // 60s timeout
+
+ File outputFile = new File(TARGET_DIR, ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX);
+ while (!outputFile.exists() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(1000);
+ LOG.debug("Waiting for {}", outputFile);
+ }
+
+ Assert.assertTrue("output file exists " + ExactlyOnceFileOutputApp.FileWriter.FILE_NAME_PREFIX, outputFile.exists() &&
+ outputFile.isFile());
+
+ String result = FileUtils.readFileToString(outputFile);
+ Assert.assertTrue(result.contains("count=2"));
+
+ appHandle.shutdown(ShutdownMode.KILL);
+ }
+
+}
diff --git a/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
new file mode 100644
index 0000000..62bfb74
--- /dev/null
+++ b/examples/exactly-once/src/test/java/org/apache/apex/examples/exactlyonce/ExactlyOnceJdbcOutputTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.examples.exactlyonce;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.Statement;
+import java.util.HashSet;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.api.EmbeddedAppLauncher;
+import org.apache.apex.api.Launcher;
+import org.apache.apex.api.Launcher.AppHandle;
+import org.apache.apex.api.Launcher.LaunchMode;
+import org.apache.apex.api.Launcher.ShutdownMode;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.lib.db.jdbc.JdbcTransactionalStore;
+
+import info.batey.kafka.unit.KafkaUnit;
+import info.batey.kafka.unit.KafkaUnitRule;
+import kafka.producer.KeyedMessage;
+
+public class ExactlyOnceJdbcOutputTest
+{
+ private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceJdbcOutputTest.class);
+ private static final String DB_DRIVER = "org.hsqldb.jdbcDriver";
+ private static final String DB_URL = "jdbc:hsqldb:mem:test;sql.syntax_mys=true";
+ private static final String TABLE_NAME = "WORDS";
+
+ private final int brokerPort = NetUtils.getFreeSocketPort();
+ private final int zkPort = NetUtils.getFreeSocketPort();
+
+ @Rule
+ public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
+
+ {
+ // required to avoid 50 partitions auto creation
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("num.partitions", "1");
+ this.kafkaUnitRule.getKafkaUnit().setKafkaBrokerConfig("offsets.topic.num.partitions", "1");
+ }
+
+ @Before
+ public void beforeTest() throws Exception
+ {
+ // setup hsqldb
+ Class.forName(DB_DRIVER).newInstance();
+
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+
+ String createMetaTable = "CREATE TABLE IF NOT EXISTS " + JdbcTransactionalStore.DEFAULT_META_TABLE + " ( "
+ + JdbcTransactionalStore.DEFAULT_APP_ID_COL + " VARCHAR(100) NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + " INT NOT NULL, "
+ + JdbcTransactionalStore.DEFAULT_WINDOW_COL + " BIGINT NOT NULL, "
+ + "UNIQUE (" + JdbcTransactionalStore.DEFAULT_APP_ID_COL + ", "
+ + JdbcTransactionalStore.DEFAULT_OPERATOR_ID_COL + ", " + JdbcTransactionalStore.DEFAULT_WINDOW_COL + ") "
+ + ")";
+ stmt.executeUpdate(createMetaTable);
+
+ String createTable = "CREATE TABLE IF NOT EXISTS " + TABLE_NAME
+ + "(word VARCHAR(255) not NULL, wcount INTEGER, PRIMARY KEY ( word ))";
+ stmt.executeUpdate(createTable);
+
+ }
+
+ @Test
+ public void testApplication() throws Exception
+ {
+ KafkaUnit ku = kafkaUnitRule.getKafkaUnit();
+ String topicName = "testTopic";
+ // topic creation is async and the producer may also auto-create it
+ ku.createTopic(topicName, 1);
+
+ // produce test data
+ String[] words = "count the words from kafka and store them in the db".split("\\s+");
+ for (String word : words) {
+ ku.sendMessages(new KeyedMessage<String, String>(topicName, word));
+ }
+
+ Configuration conf = new Configuration(false);
+ conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
+ conf.set("apex.operator.kafkaInput.prop.topics", topicName);
+ conf.set("apex.operator.kafkaInput.prop.clusters", "localhost:" + brokerPort);
+ conf.set("apex.operator.kafkaInput.prop.maxTuplesPerWindow", "1"); // consume one word per window
+ conf.set("apex.operator.kafkaInput.prop.initialOffset", "EARLIEST");
+ conf.set("apex.operator.store.prop.store.databaseDriver", DB_DRIVER);
+ conf.set("apex.operator.store.prop.store.databaseUrl", DB_URL);
+
+ EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+ Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true); // terminate after results are available
+ AppHandle appHandle = launcher.launchApp(new ExactlyOnceJdbcOutputApp(), conf, launchAttributes);
+ HashSet<String> wordsSet = Sets.newHashSet(words);
+ Connection con = DriverManager.getConnection(DB_URL);
+ Statement stmt = con.createStatement();
+ int rowCount = 0;
+ long timeout = System.currentTimeMillis() + 30000; // 30s timeout
+ while (rowCount < wordsSet.size() && timeout > System.currentTimeMillis()) {
+ Thread.sleep(500);
+ String countQuery = "SELECT count(*) from " + TABLE_NAME;
+ ResultSet resultSet = stmt.executeQuery(countQuery);
+ resultSet.next();
+ rowCount = resultSet.getInt(1);
+ resultSet.close();
+ LOG.info("current row count in {} is {}", TABLE_NAME, rowCount);
+ }
+ Assert.assertEquals("number of words", wordsSet.size(), rowCount);
+ appHandle.shutdown(ShutdownMode.KILL);
+ }
+
+}
diff --git a/examples/exactly-once/src/test/resources/log4j.properties b/examples/exactly-once/src/test/resources/log4j.properties
index dd5910b..b9bee5d 100644
--- a/examples/exactly-once/src/test/resources/log4j.properties
+++ b/examples/exactly-once/src/test/resources/log4j.properties
@@ -1,8 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
log4j.rootLogger=DEBUG,CONSOLE
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
+log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
+test.log.console.threshold=INFO
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
@@ -17,7 +38,6 @@
log4j.appender.SYSLOG.Facility=LOCAL1
log4j.logger.org=info
-log4j.logger.kafka.server=info
-log4j.logger.kafka.request.logger=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=info
+log4j.logger.org.apache.apex=debug
diff --git a/examples/pom.xml b/examples/pom.xml
index 6c76381..cfd8431 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -201,6 +201,7 @@
<module>ftp</module>
<module>s3</module>
<module>jdbc</module>
+ <module>exactly-once</module>
</modules>
<dependencies>