Fixed fail-fast-induced issues in tests due to S4-85 merge
- avoid fail fast zookeeper clients in tests
- also : don't rely on files for results checks
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 8ab093f..05a38be 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -1,6 +1,5 @@
 package org.apache.s4.core.ft;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -10,9 +9,7 @@
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
@@ -23,8 +20,6 @@
 import org.junit.After;
 import org.junit.Test;
 
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
 import com.google.inject.Injector;
 
 public class FTWordCountTest extends ZkBasedTest {
@@ -39,9 +34,7 @@
     @Test
     public void testCheckpointAndRecovery() throws Exception {
 
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
 
         TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
 
@@ -50,7 +43,7 @@
         restartNode();
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+        CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
 
         // add authorizations for processing
         for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
@@ -97,15 +90,9 @@
             zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
         }
         injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
-        signalTextProcessed.await(10, TimeUnit.SECONDS);
-        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-        if (!results.exists()) {
-            // in case the results file isn't ready yet
-            Thread.sleep(1000);
-            results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-        }
-        String s = CoreTestUtils.readFile(results);
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+        Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
+        String results = new String(zk.getData("/results", false, null));
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
 
     }
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 326e1a1..0566b3b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -11,10 +11,8 @@
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZkClient;
-import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
 import org.apache.zookeeper.KeeperException;
@@ -22,8 +20,6 @@
 import org.junit.After;
 import org.junit.Test;
 
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
 import com.google.inject.Injector;
 
 public class RecoveryTest extends ZkBasedTest {
@@ -96,9 +92,7 @@
         final CountDownLatch signalCheckpointed = new CountDownLatch(1);
         CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
 
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
 
         TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 620494f..f129dae 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -18,11 +18,9 @@
 import org.apache.s4.base.Event;
 import org.apache.s4.base.EventMessage;
 import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
 import org.apache.s4.comm.tcp.TCPEmitter;
 import org.apache.s4.comm.topology.ZNRecord;
 import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.s4.fixtures.CoreTestUtils;
 import org.apache.s4.fixtures.ZkBasedTest;
@@ -36,8 +34,6 @@
 
 import com.google.common.io.ByteStreams;
 import com.google.common.io.Files;
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
 import com.google.inject.Injector;
 import com.sun.net.httpserver.Headers;
 import com.sun.net.httpserver.HttpExchange;
@@ -112,9 +108,7 @@
         CommTestUtils
                 .watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
 
-        Injector injector = Guice.createInjector(
-                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
-                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+        Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
 
         TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
 
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
new file mode 100644
index 0000000..4744e03
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class AssignmentFromZKNoFailFast extends AssignmentFromZK {
+
+    @Inject
+    public AssignmentFromZKNoFailFast(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        // no fail fast
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
new file mode 100644
index 0000000..f600c2f
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClusterFromZKNoFailFast extends ClusterFromZK {
+
+    @Inject
+    public ClusterFromZKNoFailFast(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        // no fail fast
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
new file mode 100644
index 0000000..3a72050
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClustersFromZKNoFailFast extends ClustersFromZK {
+
+    @Inject
+    public ClustersFromZKNoFailFast(@Named("cluster.name") String clusterName,
+            @Named("cluster.zk_address") String zookeeperAddress,
+            @Named("cluster.zk_session_timeout") int sessionTimeout,
+            @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+        super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+    }
+
+    @Override
+    public void handleStateChanged(KeeperState state) throws Exception {
+        // no fail fast
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 00a7611..91cd799 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -7,6 +7,8 @@
 
 import junit.framework.Assert;
 
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.DefaultCoreModule;
 import org.apache.s4.core.Main;
 import org.gradle.tooling.BuildLauncher;
 import org.gradle.tooling.GradleConnector;
@@ -15,6 +17,10 @@
 import sun.net.ProgressListener;
 
 import com.google.common.io.PatternFilenameFilter;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
 
 /**
  * Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
@@ -88,4 +94,11 @@
         }
 
     }
+
+    public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
+        return Guice.createInjector(Modules.override(
+                new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+                new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
+                new NonFailFastZookeeperClientsModule()));
+    }
 }
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java
new file mode 100644
index 0000000..790d591
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java
@@ -0,0 +1,39 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Clusters;
+
+import com.google.inject.AbstractModule;
+
+/**
+ * 
+ * Used for injecting non-fail-fast zookeeper client classes.
+ * 
+ * Here is why:
+ * 
+ * <ul>
+ * <li>tests contained in a single junit class are not forked: forking is on a class basis</li>
+ * <li>zookeeper client classes are injected during the tests</li>
+ * <li>zookeeper server is restarted between test methods.</li>
+ * <li>zookeeper client classes from previous tests methods get a "expired" exception upon reconnection to the new
+ * zookeeper instance. With a fail-fast implementation, this would kill the current test.</li>
+ * </ul>
+ * 
+ * 
+ */
+public class NonFailFastZookeeperClientsModule extends AbstractModule {
+
+    public NonFailFastZookeeperClientsModule() {
+    }
+
+    @Override
+    protected void configure() {
+        bind(Assignment.class).to(AssignmentFromZKNoFailFast.class);
+        bind(Cluster.class).to(ClusterFromZKNoFailFast.class);
+
+        bind(Clusters.class).to(ClustersFromZKNoFailFast.class);
+
+    }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index dffe6cb..107f271 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -12,6 +12,7 @@
 import org.apache.s4.core.ProcessingElement;
 import org.apache.s4.fixtures.CommTestUtils;
 import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -62,9 +63,14 @@
                 for (Entry<String, Integer> entry : entrySet) {
                     sb.append(entry.getKey() + "=" + entry.getValue() + ";");
                 }
-                CommTestUtils.writeStringToFile(sb.toString(), results);
 
-                zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                try {
+                    zk.delete("/results", -1);
+                } catch (NoNodeException ignored) {
+                }
+
+                zk.create("/results", sb.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
             } else {
                 // NOTE: this will fail if we did not recover the latest
                 // counter,
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5987e4a..0484b57 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -1,6 +1,5 @@
 package org.apache.s4.wordcount;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -80,7 +79,7 @@
                 "-extraModulesClasses=" + WordCountModule.class.getName() });
 
         CountDownLatch signalTextProcessed = new CountDownLatch(1);
-        CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+        CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
 
         // add authorizations for processing
         for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
@@ -90,10 +89,8 @@
         injectSentence(SENTENCE_2);
         injectSentence(SENTENCE_3);
         Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
-        File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
-        String s = CommTestUtils.readFile(results);
-        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-
+        String results = new String(zk.getData("/results", false, null));
+        Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
     }
 
     public void injectSentence(String sentence) throws IOException {