utility scripts + refactored twitter app
- scripts for defining cluster configurations, starting nodes and
uploading apps
- removed custom event class for twitter app
diff --git a/build.gradle b/build.gradle
index 7db43d7..fd75057 100644
--- a/build.gradle
+++ b/build.gradle
@@ -72,7 +72,8 @@
     junit:              'junit:junit:4.10',
     zkclient:           'com.github.sgroschupf:zkclient:0.1',
     diezel:             'net.ericaro:diezel-maven-plugin:1.0.0-beta-4',
-    jcommander:         'com.beust:jcommander:1.23'
+    jcommander:         'com.beust:jcommander:1.23',
+    commons_io:         'commons-io:commons-io:2.1'
 ]
 
 subprojects {
diff --git a/s4 b/s4
new file mode 100755
index 0000000..a958cc2
--- /dev/null
+++ b/s4
@@ -0,0 +1,44 @@
+#!/bin/bash
+
+# NOTE: "./gradlew s4-tools:installApp" will prepare/update the tools subproject and related startup scripts
+
+echo $0
+echo $1
+
+GRADLE=`pwd`/gradlew
+
+case "$1" in
+"deploy")
+# examples:
+# ./s4 deploy -appName=twitter-counter -buildFile=<s4-dir>/test-apps/twitter-counter/build.gradle -cluster=s4-test-cluster
+# ./s4 deploy -appName=twitter-adapter -buildFile=<s4-dir>/test-apps/twitter-adapter/build.gradle -cluster=s4-adapter-cluster
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.Deploy -gradle=$GRADLE $@
+;;
+"zkServer")
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.ZKServer $@
+;;
+"newCluster")
+# examples:
+#./s4 newCluster -name=s4-test-cluster -firstListeningPort=11000 -nbTasks=2 ; ./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=13000 -nbTasks=1
+	shift
+    subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.tools.DefineCluster $@
+;;
+"appNode")
+# example:
+# ./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+	shift
+  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.Main $@
+;;
+"adapterNode")
+# example:
+# ./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+	shift
+  	subprojects/s4-tools/build/install/s4-tools/bin/s4-tools org.apache.s4.core.adapter.AdapterMain $@
+;;
+
+
+
+esac
+
diff --git a/settings.gradle b/settings.gradle
index a5655c4..33827ba 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -17,8 +17,13 @@
 include 's4-core'

 include 's4-comm'

 include 's4-example'

+include 's4-tools'

+//include 's4-example'

+//include ':test-apps:simple-adapter-1'

 include ':test-apps:simple-deployable-app-1'

 include ':test-apps:simple-deployable-app-2'

+include ':test-apps:s4-showtime'

+include ':test-apps:s4-counter'

 

 rootProject.name = 's4'

 rootProject.children.each {project ->

diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
index e0a9456..7d7913d 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/CommTestUtils.java
@@ -38,7 +38,7 @@
 
     private static final Logger logger = LoggerFactory.getLogger(CommTestUtils.class);
 
-    public static final int ZK_PORT = 21810;
+    public static final int ZK_PORT = 2181;
     public static final int INITIAL_BOOKIE_PORT = 5000;
     public static File DEFAULT_TEST_OUTPUT_DIR = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp");
     public static File DEFAULT_STORAGE_DIR = new File(DEFAULT_TEST_OUTPUT_DIR.getAbsolutePath() + File.separator
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
index 2575013..dd46439 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/App.java
@@ -18,6 +18,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.s4.base.Event;
@@ -25,6 +26,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
 import com.google.inject.AbstractModule;
 import com.google.inject.Guice;
 import com.google.inject.Inject;
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
index 71756a4..c18603c 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/Main.java
@@ -29,16 +29,19 @@
      * @param args
      */
     public static void main(String[] args) {
-
-        if (args.length == 0) {
-            logger.info("Starting S4 node with default configuration");
-            startDefaultS4Node();
-        } else if (args.length == 1) {
-            logger.info("Starting S4 node with custom configuration from file {}", args[0]);
-            startCustomS4Node(args[0]);
-        } else {
-            logger.info("Starting S4 node in development mode");
-            startDevelopmentMode(args);
+        try {
+            if (args.length == 0) {
+                logger.info("Starting S4 node with default configuration");
+                startDefaultS4Node();
+            } else if (args.length == 1) {
+                logger.info("Starting S4 node with custom configuration from file {}", args[0]);
+                startCustomS4Node(args[0]);
+            } else {
+                logger.info("Starting S4 node in development mode");
+                startDevelopmentMode(args);
+            }
+        } catch (Exception e) {
+            logger.error("Cannot start S4 node", e);
         }
     }
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
index a1c496c..f027f06 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/core/adapter/AdapterMain.java
@@ -46,13 +46,7 @@
     @Parameters(separators = "=")
     static class AdapterArgs {
 
-        @Parameter(names = "-moduleClass", description = "module class name")
-        String moduleClass;
-
-        @Parameter(names = "-adapterClass", description = "adapter class name")
-        String adapterClass;
-
-        @Parameter(names = "-s4Properties", description = "s4 properties file path")
+        @Parameter(names = "-s4Properties", description = "s4 properties file path", required = true)
         String s4PropertiesFilePath;
     }
 
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
index 9f66e0d..670c375 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
+++ b/subprojects/s4-core/src/main/java/org/apache/s4/fluent/FluentApp.java
@@ -35,15 +35,4 @@
         appMaker.close();
     }
 
-    public void start() {
-        super.start();
-    }
-
-    public void init() {
-        super.init();
-    }
-
-    public void close() {
-        super.close();
-    }
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index 820dbe5..dffe6cb 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -1,6 +1,5 @@
 package org.apache.s4.wordcount;
 
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Map.Entry;
@@ -18,25 +17,25 @@
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper;
 
-
 public class WordClassifierPE extends ProcessingElement implements Watcher {
 
     TreeMap<String, Integer> counts = new TreeMap<String, Integer>();
     private int counter;
     transient private ZooKeeper zk;
 
-    private WordClassifierPE () {}
+    private WordClassifierPE() {
+    }
 
     public WordClassifierPE(App app) {
         super(app);
     }
-    
+
     public void onEvent(WordCountEvent event) {
         try {
             WordCountEvent wcEvent = event;
             if (zk == null) {
                 try {
-                    zk = new ZooKeeper("localhost:21810", 4000, this);
+                    zk = new ZooKeeper("localhost:2181", 4000, this);
                 } catch (IOException e) {
                     throw new RuntimeException(e);
                 }
@@ -73,8 +72,8 @@
                 // zookeeper
                 zk.create("/classifierIteration_" + counter, new byte[counter], Ids.OPEN_ACL_UNSAFE,
                         CreateMode.PERSISTENT);
-                Logger.getLogger("s4-ft").debug("wrote classifier iteration ["+counter+"]");
-                System.out.println("wrote classifier iteration ["+counter+"]");
+                Logger.getLogger("s4-ft").debug("wrote classifier iteration [" + counter + "]");
+                System.out.println("wrote classifier iteration [" + counter + "]");
                 // check if we are allowed to continue
                 if (null == zk.exists("/continue_" + counter, null)) {
                     CountDownLatch latch = new CountDownLatch(1);
@@ -96,19 +95,19 @@
     @Override
     public void process(WatchedEvent event) {
         // TODO Auto-generated method stub
- 
+
     }
 
     @Override
     protected void onCreate() {
         // TODO Auto-generated method stub
-        
+
     }
 
     @Override
     protected void onRemove() {
         // TODO Auto-generated method stub
-        
+
     }
 
 }
diff --git a/subprojects/s4-core/src/test/resources/default.s4.properties b/subprojects/s4-core/src/test/resources/default.s4.properties
index 62fc7d5..2235032 100644
--- a/subprojects/s4-core/src/test/resources/default.s4.properties
+++ b/subprojects/s4-core/src/test/resources/default.s4.properties
@@ -4,6 +4,13 @@
 cluster.ports = 5077
 cluster.lock_dir = {user.dir}/tmp
 cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
+s4.logger_level = DEBUG
+comm.module = org.apache.s4.core.CustomModule
+appsDir=/tmp/deploy-test
+tcp.partition.queue_size=1000
+comm.timeout=100
+comm.retry_delay=100
+comm.retries=10
diff --git a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
index 9a5d11a..3a566f2 100644
--- a/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
+++ b/subprojects/s4-core/src/test/resources/org.apache.s4.deploy.s4.properties
@@ -3,9 +3,9 @@
 cluster.hosts = localhost
 cluster.ports = 5077
 cluster.name = s4-test-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
 comm.module = org.apache.s4.core.adapter.AdapterModule
-s4.logger_level = DEBUG
+s4.logger_level = TRACE
 appsDir=/tmp/deploy-test
diff --git a/subprojects/s4-tools/s4-tools.gradle b/subprojects/s4-tools/s4-tools.gradle
new file mode 100644
index 0000000..68c728e
--- /dev/null
+++ b/subprojects/s4-tools/s4-tools.gradle
@@ -0,0 +1,48 @@
+/*

+ * Copyright 2010 the original author or authors.

+ *

+ * Licensed under the Apache License, Version 2.0 (the "License");

+ * you may not use this file except in compliance with the License.

+ * You may obtain a copy of the License at

+ *

+ *      http://www.apache.org/licenses/LICENSE-2.0

+ *

+ * Unless required by applicable law or agreed to in writing, software

+ * distributed under the License is distributed on an "AS IS" BASIS,

+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

+ * See the License for the specific language governing permissions and

+ * limitations under the License.

+ */

+

+description = 'The S4 core platform.'

+

+apply plugin: 'java'

+

+task "create-dirs" << {

+   sourceSets.all*.java.srcDirs*.each { it.mkdirs() }

+   sourceSets.all*.resources.srcDirs*.each { it.mkdirs() }

+}

+

+dependencies {

+    compile project(":s4-base")

+    compile project(":s4-comm")

+    compile project(":s4-core")

+    compile libraries.jcommander

+    compile libraries.zkclient

+    compile libraries.commons_io

+}

+

+apply plugin:'application'

+mainClassName = "org.apache.s4.tools.Tools"

+

+run {

+    // run doesn't yet directly accept command line parameters...

+    if ( project.hasProperty('args') ) {

+        args project.args.split('\\s+')

+        print args

+    }

+ }

+

+test {

+    forkEvery=1

+}

diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
similarity index 61%
rename from subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
rename to subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
index b79f1a0..4fc09f0 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ZKServer.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/DefineCluster.java
@@ -1,7 +1,9 @@
-package org.apache.s4.fixtures;
+package org.apache.s4.tools;
 
 import java.util.Arrays;
 
+import org.apache.log4j.BasicConfigurator;
+import org.apache.log4j.Level;
 import org.apache.s4.comm.tools.TaskSetup;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -10,14 +12,16 @@
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 
-public class ZKServer {
+public class DefineCluster {
 
-    private static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+    static Logger logger = LoggerFactory.getLogger(DefineCluster.class);
 
-    /**
-     * @param args
-     */
     public static void main(String[] args) {
+        // configure log4j for Zookeeper
+        BasicConfigurator.configure();
+        org.apache.log4j.Logger.getLogger("org.apache.zookeeper").setLevel(Level.ERROR);
+        org.apache.log4j.Logger.getLogger("org.I0Itec").setLevel(Level.ERROR);
+
         ZKServerArgs clusterArgs = new ZKServerArgs();
         JCommander jc = new JCommander(clusterArgs);
         try {
@@ -30,35 +34,29 @@
         }
         try {
 
-            logger.info("Starting zookeeper server for cluster [{}] with [{}] node(s)", clusterArgs.clusterName,
-                    clusterArgs.nbTasks);
-            if (clusterArgs.startZK) {
-                CommTestUtils.startZookeeperServer();
-            }
+            logger.info("preparing new cluster [{}] with [{}] node(s)", clusterArgs.clusterName, clusterArgs.nbTasks);
+
             TaskSetup taskSetup = new TaskSetup(clusterArgs.zkConnectionString);
             taskSetup.clean(clusterArgs.clusterName);
             taskSetup.setup(clusterArgs.clusterName, clusterArgs.nbTasks, clusterArgs.firstListeningPort);
-            logger.info("Zookeeper started");
+            logger.info("New cluster configuration uploaded into zookeeper");
         } catch (Exception e) {
             logger.error("Cannot initialize zookeeper with specified configuration", e);
         }
 
     }
 
-    @Parameters(separators = "=", commandDescription = "Start Zookeeper server and initialize S4 cluster configuration in Zookeeper (and clean previous one with same cluster name)")
+    @Parameters(separators = "=", commandDescription = "Setup new S4 logical cluster")
     static class ZKServerArgs {
 
-        @Parameter(names = "-cluster", description = "S4 cluster name", required = true)
+        @Parameter(names = "-name", description = "S4 cluster name", required = true)
         String clusterName = "s4-test-cluster";
 
         @Parameter(names = "-nbTasks", description = "number of tasks for the cluster", required = true)
         int nbTasks = 1;
 
         @Parameter(names = "-zk", description = "Zookeeper connection string")
-        String zkConnectionString = "localhost:21810";
-
-        @Parameter(names = "-startZK", description = "Start local zookeeper server (connection string ignored in that case)", required = false)
-        boolean startZK = false;
+        String zkConnectionString = "localhost:2181";
 
         @Parameter(names = "-firstListeningPort", description = "Initial listening port for nodes in this cluster. First node listens on the specified port, other nodes listen on port initial + nodeIndex", required = true)
         int firstListeningPort = -1;
diff --git a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
similarity index 82%
rename from subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
rename to subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
index d8b77a4..1b2eb7d 100644
--- a/subprojects/s4-core/src/main/java/org/apache/s4/deploy/util/DeployApp.java
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Deploy.java
@@ -1,4 +1,4 @@
-package org.apache.s4.deploy.util;
+package org.apache.s4.tools;
 
 import java.io.BufferedReader;
 import java.io.File;
@@ -26,9 +26,10 @@
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
 
-public class DeployApp {
+public class Deploy {
 
     private static File tmpAppsDir;
+    static org.slf4j.Logger logger = LoggerFactory.getLogger(Deploy.class);
 
     /**
      * @param args
@@ -45,34 +46,36 @@
         try {
             jc.parse(args);
         } catch (Exception e) {
+            e.printStackTrace();
             jc.usage();
             System.exit(-1);
         }
         try {
-            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString);
+            ZkClient zkClient = new ZkClient(appArgs.zkConnectionString, appArgs.timeout);
             zkClient.setZkSerializer(new ZNRecordSerializer());
 
             tmpAppsDir = Files.createTempDir();
 
-            // File gradlewFile = CoreTestUtils.findGradlewInRootDir();
-
-            // CoreTestUtils.callGradleTask(gradlewFile, new File(appArgs.gradleBuildFilePath), "installS4R",
-            // new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
-            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
-                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
-
             File s4rToDeploy = File.createTempFile("testapp" + System.currentTimeMillis(), "s4r");
 
-            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(tmpAppsDir.getAbsolutePath() + "/"
-                    + appArgs.appName + ".s4r")), Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
+            String generatedS4RPath = null;
+
+            ExecGradle.exec(appArgs.gradleExecPath, appArgs.gradleBuildFilePath, "installS4R",
+                    new String[] { "appsDir=" + tmpAppsDir.getAbsolutePath() });
+            generatedS4RPath = tmpAppsDir.getAbsolutePath() + "/" + appArgs.appName + ".s4r";
+
+            Assert.assertTrue(ByteStreams.copy(Files.newInputStreamSupplier(new File(generatedS4RPath)),
+                    Files.newOutputStreamSupplier(s4rToDeploy)) > 0);
 
             final String uri = s4rToDeploy.toURI().toString();
             ZNRecord record = new ZNRecord(String.valueOf(System.currentTimeMillis()));
             record.putSimpleField(DistributedDeploymentManager.S4R_URI, uri);
             zkClient.create("/" + appArgs.clusterName + "/apps/" + appArgs.appName, record, CreateMode.PERSISTENT);
+            logger.info("uploaded application [{}] to cluster [{}], using zookeeper znode [{}]", new String[] {
+                    appArgs.appName, appArgs.clusterName, "/" + appArgs.clusterName + "/apps/" + appArgs.appName });
 
         } catch (Exception e) {
-            LoggerFactory.getLogger(DeployApp.class).error("Cannot deploy app", e);
+            LoggerFactory.getLogger(Deploy.class).error("Cannot deploy app", e);
         }
 
     }
@@ -95,25 +98,32 @@
         @Parameter(names = "-zk", description = "zookeeper connection string")
         String zkConnectionString = "localhost:2181";
 
+        @Parameter(names = "-timeout", description = "connection timeout to Zookeeper, in ms")
+        int timeout = 10000;
+
     }
 
     static class ExecGradle {
 
         public static void exec(String gradlewExecPath, String buildFilePath, String taskName, String[] params)
                 throws Exception {
+            // Thread.sleep(10000);
             List<String> cmdList = new ArrayList<String>();
+            // cmdList.add("sleep");
+            // cmdList.add("2");
+            // cmdList.add(";");
             cmdList.add(gradlewExecPath);
             // cmdList.add("-c");
             // cmdList.add(gradlewFile.getParentFile().getAbsolutePath() + "/settings.gradle");
             cmdList.add("-b");
             cmdList.add(buildFilePath);
             cmdList.add(taskName);
+            cmdList.add("-stacktrace");
             if (params.length > 0) {
                 for (int i = 0; i < params.length; i++) {
                     cmdList.add("-P" + params[i]);
                 }
             }
-
             System.out.println(Arrays.toString(cmdList.toArray(new String[] {})).replace(",", ""));
             ProcessBuilder pb = new ProcessBuilder(cmdList);
 
@@ -137,6 +147,7 @@
                     }
                 }
             }).start();
+
             process.waitFor();
 
             // try {
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
new file mode 100644
index 0000000..ae53cb6
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/Tools.java
@@ -0,0 +1,22 @@
+package org.apache.s4.tools;
+
+import java.lang.reflect.Method;
+import java.util.Arrays;
+
+public class Tools {
+
+    public static void main(String[] args) {
+        try {
+            Class<?> toolClass = Class.forName(args[0]);
+            Method main = toolClass.getMethod("main", String[].class);
+            if (args.length > 1) {
+                main.invoke(null, new Object[] { Arrays.copyOfRange(args, 1, args.length) });
+            } else {
+                main.invoke(null, new Object[] { new String[0] });
+            }
+        } catch (Exception e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+    }
+}
diff --git a/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
new file mode 100644
index 0000000..cb4c85c
--- /dev/null
+++ b/subprojects/s4-tools/src/main/java/org/apache/s4/tools/ZKServer.java
@@ -0,0 +1,75 @@
+package org.apache.s4.tools;
+
+import java.io.File;
+import java.util.Arrays;
+
+import org.I0Itec.zkclient.IDefaultNameSpace;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkServer;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+
+public class ZKServer {
+
+    static Logger logger = LoggerFactory.getLogger(ZKServer.class);
+
+    public static void main(String[] args) {
+        ZKServerArgs zkArgs = new ZKServerArgs();
+        JCommander jc = new JCommander(zkArgs);
+        try {
+            jc.parse(args);
+        } catch (Exception e) {
+            System.out.println(Arrays.toString(args));
+            e.printStackTrace();
+            jc.usage();
+            System.exit(-1);
+        }
+        try {
+
+            logger.info("Starting zookeeper server on port [{}]", zkArgs.zkPort);
+
+            if (zkArgs.clean) {
+                logger.info("cleaning existing data in [{}] and [{}]", zkArgs.dataDir, zkArgs.logDir);
+                FileUtils.deleteDirectory(new File(zkArgs.dataDir));
+                FileUtils.deleteDirectory(new File(zkArgs.logDir));
+            }
+            IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+
+                @Override
+                public void createDefaultNameSpace(ZkClient zkClient) {
+
+                }
+            };
+
+            ZkServer zkServer = new ZkServer(zkArgs.dataDir, zkArgs.logDir, defaultNameSpace);
+            zkServer.start();
+        } catch (Exception e) {
+            logger.error("Cannot initialize zookeeper with specified configuration", e);
+        }
+    }
+
+    @Parameters(separators = "=", commandDescription = "Start Zookeeper server")
+    static class ZKServerArgs {
+
+        @Parameter(names = "-port", description = "Zookeeper port")
+        String zkPort = "2181";
+
+        @Parameter(names = "-dataDir", description = "data directory", required = false)
+        String dataDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "data").getAbsolutePath();
+
+        @Parameter(names = "-clean", description = "clean zookeeper data (arity=0) (make sure you specify correct directories...)")
+        boolean clean = true;
+
+        @Parameter(names = "-logDir", description = "log directory")
+        String logDir = new File(System.getProperty("java.io.tmpdir") + File.separator + "tmp" + File.separator
+                + "zookeeper" + File.separator + "log").getAbsolutePath();
+
+    }
+
+}
diff --git a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
index 316b876..cab4eab 100644
--- a/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
+++ b/test-apps/s4-counter/src/main/java/s4app/ClockApp.java
@@ -12,44 +12,27 @@
     private ClockPE clockPE;
 
     @Override
-    protected void start() {
+    protected void onStart() {
         System.out.println("Starting CounterApp...");
         clockPE.getInstanceForKey("single");
     }
 
     // generic array due to varargs generates a warning.
     @Override
-    protected void init() {
+    protected void onInit() {
         System.out.println("Initing CounterApp...");
 
         clockPE = new ClockPE(this);
         clockPE.setTimerInterval(1, TimeUnit.SECONDS);
 
-        eventSource = new EventSource(this, "I can give you the time!");
+        eventSource = new EventSource(this, "clockStream");
         clockPE.setStreams((Streamable) eventSource);
     }
 
     @Override
-    protected void close() {
+    protected void onClose() {
         System.out.println("Closing CounterApp...");
         eventSource.close();
     }
 
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onInit() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onClose() {
-        // TODO Auto-generated method stub
-
-    }
 }
diff --git a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
index 1c92c57..6652f59 100644
--- a/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
+++ b/test-apps/s4-showtime/src/main/java/s4app/ShowTimeApp.java
@@ -7,41 +7,23 @@
     private ShowPE showPE;
 
     @Override
-    protected void start() {
+    protected void onStart() {
         System.out.println("Starting ShowTimeApp...");
         showPE.getInstanceForKey("single");
     }
 
     @Override
-    protected void init() {
+    protected void onInit() {
         System.out.println("Initing ShowTimeApp...");
 
         showPE = new ShowPE(this);
 
         /* This stream will receive events from another app. */
-        createStream("I need the time.", showPE);
-    }
-
-    @Override
-    protected void close() {
-        System.out.println("Closing ShowTimeApp...");
-    }
-
-    @Override
-    protected void onStart() {
-        // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    protected void onInit() {
-        // TODO Auto-generated method stub
-
+        createStream("clockStream", showPE);
     }
 
     @Override
     protected void onClose() {
-        // TODO Auto-generated method stub
-
+        System.out.println("Closing ShowTimeApp...");
     }
 }
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
index 809cebd..8cb5843 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/SimplePE.java
@@ -41,7 +41,7 @@
     protected void onCreate() {
         if (zk == null) {
             try {
-                zk = new ZooKeeper("localhost:" + 21810, 4000, this);
+                zk = new ZooKeeper("localhost:" + 2181, 4000, this);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
diff --git a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
index 25b98a6..bbfaf17 100644
--- a/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-1/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -39,7 +39,7 @@
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
-            zkClient = new ZkClient("localhost:" + 21810);
+            zkClient = new ZkClient("localhost:" + 2181);
             if (!zkClient.exists("/s4-test")) {
                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
             }
diff --git a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
index 0657ee9..8eb345b 100644
--- a/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
+++ b/test-apps/simple-deployable-app-2/src/main/java/org/apache/s4/deploy/TestApp.java
@@ -17,7 +17,7 @@
     @Override
     protected void onInit() {
         try {
-            zkClient = new ZkClient("localhost:" + 21810);
+            zkClient = new ZkClient("localhost:" + 2181);
             if (!zkClient.exists("/s4-test")) {
                 zkClient.create("/s4-test", null, CreateMode.PERSISTENT);
             }
diff --git a/test-apps/twitter-adapter/README.txt b/test-apps/twitter-adapter/README.txt
new file mode 100644
index 0000000..f9bda45
--- /dev/null
+++ b/test-apps/twitter-adapter/README.txt
@@ -0,0 +1 @@
+Please refer to README.txt in twitter-counter application
\ No newline at end of file
diff --git a/test-apps/twitter-adapter/build.gradle b/test-apps/twitter-adapter/build.gradle
index 669d62b..100837d 100644
--- a/test-apps/twitter-adapter/build.gradle
+++ b/test-apps/twitter-adapter/build.gradle
@@ -49,6 +49,9 @@
 
 apply plugin: 'java'
 apply plugin: 'eclipse'
+apply plugin:'application'
+
+mainClassName = "org.apache.s4.core.Main"
 
 /* The app classname is set automatically from the source files. */
 def appClassname = ''
diff --git a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
index ee905d9..102ca10 100644
--- a/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
+++ b/test-apps/twitter-adapter/src/main/java/org/apache/s4/example/twitter/TwitterInputAdapter.java
@@ -1,6 +1,9 @@
 package org.apache.s4.example.twitter;
 
+import java.io.File;
+import java.io.FileInputStream;
 import java.net.ServerSocket;
+import java.util.Properties;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.I0Itec.zkclient.ZkClient;
@@ -15,6 +18,7 @@
 import twitter4j.StatusListener;
 import twitter4j.TwitterStream;
 import twitter4j.TwitterStreamFactory;
+import twitter4j.conf.ConfigurationBuilder;
 
 public class TwitterInputAdapter extends Adapter {
 
@@ -49,7 +53,20 @@
 
     public void connectAndRead() throws Exception {
 
-        TwitterStream twitterStream = TwitterStreamFactory.getSingleton();
+        ConfigurationBuilder cb = new ConfigurationBuilder();
+        Properties twitterProperties = new Properties();
+        File twitter4jPropsFile = new File(System.getProperty("user.home") + "/twitter4j.properties");
+        if (!twitter4jPropsFile.exists()) {
+            logger.error(
+                    "Cannot find twitter4j.properties file in this location :[{}]. Make sure it is available at this place and includes user/password credentials",
+                    twitter4jPropsFile.getAbsolutePath());
+            return;
+        }
+        twitterProperties.load(new FileInputStream(twitter4jPropsFile));
+
+        cb.setDebugEnabled(Boolean.valueOf(twitterProperties.getProperty("debug")))
+                .setUser(twitterProperties.getProperty("user")).setPassword(twitterProperties.getProperty("password"));
+        TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).getInstance();
         StatusListener statusListener = new StatusListener() {
 
             @Override
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-adapter/src/main/resources/s4.properties
similarity index 79%
rename from test-apps/twitter-adapter/src/main/resources/default.s4.properties
rename to test-apps/twitter-adapter/src/main/resources/s4.properties
index cd36aaa..fdd1bf6 100644
--- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
+++ b/test-apps/twitter-adapter/src/main/resources/s4.properties
@@ -3,11 +3,11 @@
 cluster.hosts = localhost
 cluster.ports = 5077
 cluster.name = s4-adapter-cluster
-cluster.zk_address = localhost:21810
+cluster.zk_address = localhost:2181
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
+comm.module = org.apache.s4.core.adapter.AdapterModule
+s4.logger_level = DEBUG
 appsDir=/tmp/deploy-test
 tcp.partition.queue_size=1000
 comm.timeout=100
diff --git a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties b/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
deleted file mode 100644
index 7d58c7d..0000000
--- a/test-apps/twitter-adapter/src/main/resources/twitter4j.properties
+++ /dev/null
@@ -1,5 +0,0 @@
-debug=true
-# you need to set those parameters with valid twitter account credentials
-twitter4j.user=????
-twitter4j.password=????
-
diff --git a/test-apps/twitter-counter/README.txt b/test-apps/twitter-counter/README.txt
new file mode 100644
index 0000000..d46b491
--- /dev/null
+++ b/test-apps/twitter-counter/README.txt
@@ -0,0 +1,33 @@
+An application that displays the current top 10 topics, as gathered from the twitter sample stream.
+It was ported and adapted from S4 0.3
+
+Architecture:
+- twitter-adapter app in adapter node connects to the twitter stream, extracts the twitted text and passes that to the application cluster
+- twitter-counter app in the application cluster receives the text of the tweets, extracts the topics, counts topic occurences and periodically displays the top 10 topics on the console
+
+How to configure:
+- you need a twitter4j.properties file in your home dir, with the following properties filled:
+debug=true|false
+user=<a twitter user name>
+password=<the matching password>
+
+How to run:
+0/ make sure tools are compiled by running ./gradlew s4-tools:installApp
+
+1/ start zookeeper server
+./s4 zkServer
+
+2/ create adapter cluster configuration
+./s4 newCluster -name=s4-test-cluster -firstListeningPort=10000 -nbTasks=1
+
+3/ create application cluster configuration
+./s4 newCluster -name=s4-adapter-cluster -firstListeningPort=11000 -nbTasks=<number of nodes>
+NOTE: - the name of the downstream cluster is currently hardcoded in <s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties Make sure you use the same name
+
+4/ start application nodes (as many as defined in the cluster configuration, more for failover capabilities)
+./s4 appNode <s4-dir>/subprojects/s4-core/src/test/resources/default.s4.properties
+
+5/ start adapter node
+./s4 adapterNode -s4Properties=<s4-dir>/test-apps/twitter-adapter/src/main/resources/s4.properties
+
+6/ observe the topic count output (on 1 of the application nodes)
\ No newline at end of file
diff --git a/test-apps/twitter-counter/build.gradle b/test-apps/twitter-counter/build.gradle
index e737b40..e10eae0 100644
--- a/test-apps/twitter-counter/build.gradle
+++ b/test-apps/twitter-counter/build.gradle
@@ -49,6 +49,7 @@
 
 apply plugin: 'java'
 apply plugin: 'eclipse'
+apply plugin:'application'
 
 /* The app classname is set automatically from the source files. */
 def appClassname = ''
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
index 3d9a9fb..6fd68b5 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopNTopicPE.java
@@ -4,6 +4,7 @@
 import java.util.Map;
 import java.util.TreeSet;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.slf4j.Logger;
@@ -14,16 +15,16 @@
 
 public class TopNTopicPE extends ProcessingElement {
 
+    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
+    Map<String, Integer> countedTopics = Maps.newHashMap();
+
     public TopNTopicPE(App app) {
         super(app);
-        // TODO Auto-generated constructor stub
+        logger.info("key: [{}]", getId());
     }
 
-    Map<String, Integer> countedTopics = Maps.newHashMap();
-    static Logger logger = LoggerFactory.getLogger(TopNTopicPE.class);
-
-    public void onEvent(TopicSeenEvent event) {
-        countedTopics.put(event.topic, event.count);
+    public void onEvent(Event event) {
+        countedTopics.put(event.get("topic"), event.get("count", Integer.class));
     }
 
     public void onTime() {
@@ -32,6 +33,8 @@
             sortedTopics.add(new TopNEntry(topicCount.getKey(), topicCount.getValue()));
         }
 
+        logger.info("\n------------------");
+
         int i = 0;
         Iterator<TopNEntry> iterator = sortedTopics.iterator();
         long time = System.currentTimeMillis();
@@ -39,6 +42,7 @@
             TopNEntry entry = iterator.next();
             logger.info("{} : topic [{}] count [{}]",
                     new String[] { String.valueOf(time), entry.topic, String.valueOf(entry.count) });
+            i++;
         }
     }
 
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
index a6d1478..5a41231 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicCountAndReportPE.java
@@ -1,27 +1,36 @@
 package org.apache.s4.example.twitter;
 
+import org.apache.s4.base.Event;
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Stream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 // keyed by topic name
 public class TopicCountAndReportPE extends ProcessingElement {
 
-    Stream<TopicSeenEvent> downStream;
+    Stream<Event> downStream;
     int threshold = 10;
     int count;
+    boolean firstEvent = true;
+
+    static Logger logger = LoggerFactory.getLogger(TopicCountAndReportPE.class);
 
     public TopicCountAndReportPE(App app) {
         super(app);
-        // TODO Auto-generated constructor stub
     }
 
-    public void setDownstream(Stream<TopicSeenEvent> stream) {
+    public void setDownstream(Stream<Event> stream) {
         this.downStream = stream;
     }
 
-    public void onEvent(TopicSeenEvent event) {
-        count += event.count;
+    public void onEvent(Event event) {
+        if (firstEvent) {
+            logger.info("Handling new topic [{}]", getId());
+            firstEvent = false;
+        }
+        count += event.get("count", Integer.class);
     }
 
     @Override
@@ -34,14 +43,15 @@
         if (count < threshold) {
             return;
         }
-        TopicSeenEvent topicSeenEvent = new TopicSeenEvent(getId(), count);
+        Event topicSeenEvent = new Event();
+        topicSeenEvent.put("topic", String.class, getId());
+        topicSeenEvent.put("count", Integer.class, count);
+        topicSeenEvent.put("aggregationKey", String.class, "aggregationValue");
         downStream.put(topicSeenEvent);
     }
 
     @Override
     protected void onRemove() {
-        // TODO Auto-generated method stub
-
     }
 
 }
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
index 501d6fd..9936ed0 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicExtractorPE.java
@@ -6,13 +6,16 @@
 import org.apache.s4.core.App;
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.core.Streamable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Splitter;
 
 public class TopicExtractorPE extends ProcessingElement {
 
     static private ServerSocket serverSocket;
-    Streamable<TopicSeenEvent> downStream;
+    Streamable<Event> downStream;
+    static Logger logger = LoggerFactory.getLogger(TopicExtractorPE.class);
 
     public TopicExtractorPE(App app) {
         super(app);
@@ -24,18 +27,22 @@
 
     }
 
-    public void setDownStream(Streamable<TopicSeenEvent> stream) {
+    public void setDownStream(Streamable<Event> stream) {
         this.downStream = stream;
     }
 
     public void onEvent(Event event) {
         String text = event.get("statusText", String.class);
+        logger.trace("event text [{}]", text);
         if (text.contains("#")) {
             Iterable<String> split = Splitter.on("#").omitEmptyStrings().trimResults()
                     .split(text.substring(text.indexOf("#") + 1, text.length()));
             for (String topic : split) {
                 String topicOnly = topic.split(" ")[0];
-                downStream.put(new TopicSeenEvent(topicOnly, 1));
+                Event event2 = new Event();
+                event2.put("topic", String.class, topicOnly);
+                event2.put("count", Integer.class, 1);
+                downStream.put(event2);
             }
         }
     }
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
deleted file mode 100644
index b28d61e..0000000
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TopicSeenEvent.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package org.apache.s4.example.twitter;
-
-import org.apache.s4.base.Event;
-
-public class TopicSeenEvent extends Event {
-
-    public String topic;
-    public int count;
-    public String reportKey = "x";
-
-    public TopicSeenEvent(String topic, int count) {
-        super();
-        this.topic = topic;
-        this.count = count;
-    }
-
-}
diff --git a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
index cf0fb40..f6edd8c 100644
--- a/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
+++ b/test-apps/twitter-counter/src/main/java/org/apache/s4/example/twitter/TwitterCounterApp.java
@@ -29,13 +29,13 @@
             TopNTopicPE topNTopicPE = createPE(TopNTopicPE.class);
             topNTopicPE.setTimerInterval(10, TimeUnit.SECONDS);
             @SuppressWarnings("unchecked")
-            Stream<TopicSeenEvent> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
+            Stream<Event> aggregatedTopicStream = createStream("AggregatedTopicSeen", new KeyFinder() {
 
                 @Override
                 public List<String> get(Event arg0) {
                     return new ArrayList<String>() {
                         {
-                            add("x");
+                            add("aggregationKey");
                         }
                     };
                 }
@@ -44,13 +44,13 @@
             TopicCountAndReportPE topicCountAndReportPE = createPE(TopicCountAndReportPE.class);
             topicCountAndReportPE.setDownstream(aggregatedTopicStream);
             topicCountAndReportPE.setTimerInterval(10, TimeUnit.SECONDS);
-            Stream<TopicSeenEvent> topicSeenStream = createStream("TopicSeen", new KeyFinder<TopicSeenEvent>() {
+            Stream<Event> topicSeenStream = createStream("TopicSeen", new KeyFinder<Event>() {
 
                 @Override
-                public List<String> get(final TopicSeenEvent arg0) {
+                public List<String> get(final Event arg0) {
                     return new ArrayList<String>() {
                         {
-                            add(arg0.topic);
+                            add(arg0.get("topic"));
                         }
                     };
                 }
diff --git a/test-apps/twitter-adapter/src/main/resources/default.s4.properties b/test-apps/twitter-counter/src/main/resources/default.s4.properties
similarity index 65%
copy from test-apps/twitter-adapter/src/main/resources/default.s4.properties
copy to test-apps/twitter-counter/src/main/resources/default.s4.properties
index cd36aaa..d5da3f3 100644
--- a/test-apps/twitter-adapter/src/main/resources/default.s4.properties
+++ b/test-apps/twitter-counter/src/main/resources/default.s4.properties
@@ -6,13 +6,10 @@
 cluster.zk_address = localhost:21810
 cluster.zk_session_timeout = 10000
 cluster.zk_connection_timeout = 10000
-comm.module = org.apache.s4.deploy.TestModule
-s4.logger_level = TRACE
+comm.module = org.apache.s4.core.CustomModule
+s4.logger_level = DEBUG
 appsDir=/tmp/deploy-test
 tcp.partition.queue_size=1000
 comm.timeout=100
 comm.retry_delay=100
 comm.retries=10
-
-# specify the name of the remote cluster (there is currently only 1 remote cluster max)
-cluster.remote.name=s4-test-cluster