Fixed fail-fast-induced issues in tests due to S4-85 merge
- avoid fail fast zookeeper clients in tests
- also : don't rely on files for results checks
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
index 8ab093f..05a38be 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/FTWordCountTest.java
@@ -1,6 +1,5 @@
package org.apache.s4.core.ft;
-import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -10,9 +9,7 @@
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.tcp.TCPEmitter;
-import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
@@ -23,8 +20,6 @@
import org.junit.After;
import org.junit.Test;
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
import com.google.inject.Injector;
public class FTWordCountTest extends ZkBasedTest {
@@ -39,9 +34,7 @@
@Test
public void testCheckpointAndRecovery() throws Exception {
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
@@ -50,7 +43,7 @@
restartNode();
CountDownLatch signalTextProcessed = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+ CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
// add authorizations for processing
for (int i = 1; i <= WordCountTest.SENTENCE_1_TOTAL_WORDS; i++) {
@@ -97,15 +90,9 @@
zk.create("/continue_" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
}
injectSentence(injector, emitter, WordCountTest.SENTENCE_3);
- signalTextProcessed.await(10, TimeUnit.SECONDS);
- File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
- if (!results.exists()) {
- // in case the results file isn't ready yet
- Thread.sleep(1000);
- results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
- }
- String s = CoreTestUtils.readFile(results);
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
+ Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
+ String results = new String(zk.getData("/results", false, null));
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
index 326e1a1..0566b3b 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/core/ft/RecoveryTest.java
@@ -11,10 +11,8 @@
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZkClient;
-import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
import org.apache.zookeeper.KeeperException;
@@ -22,8 +20,6 @@
import org.junit.After;
import org.junit.Test;
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
import com.google.inject.Injector;
public class RecoveryTest extends ZkBasedTest {
@@ -96,9 +92,7 @@
final CountDownLatch signalCheckpointed = new CountDownLatch(1);
CoreTestUtils.watchAndSignalCreation("/checkpointed", signalCheckpointed, zk);
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
index 620494f..f129dae 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/deploy/TestAutomaticDeployment.java
@@ -18,11 +18,9 @@
import org.apache.s4.base.Event;
import org.apache.s4.base.EventMessage;
import org.apache.s4.base.SerializerDeserializer;
-import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.tcp.TCPEmitter;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
-import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.s4.fixtures.CoreTestUtils;
import org.apache.s4.fixtures.ZkBasedTest;
@@ -36,8 +34,6 @@
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
-import com.google.common.io.Resources;
-import com.google.inject.Guice;
import com.google.inject.Injector;
import com.sun.net.httpserver.Headers;
import com.sun.net.httpserver.HttpExchange;
@@ -112,9 +108,7 @@
CommTestUtils
.watchAndSignalCreation("/onEvent@" + time1, signalEvent1Processed, CommTestUtils.createZkClient());
- Injector injector = Guice.createInjector(
- new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
- new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));
+ Injector injector = CoreTestUtils.createInjectorWithNonFailFastZKClients();
TCPEmitter emitter = injector.getInstance(TCPEmitter.class);
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
new file mode 100644
index 0000000..4744e03
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class AssignmentFromZKNoFailFast extends AssignmentFromZK {
+
+ @Inject
+ public AssignmentFromZKNoFailFast(@Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ // no fail fast
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
new file mode 100644
index 0000000..f600c2f
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.ClusterFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClusterFromZKNoFailFast extends ClusterFromZK {
+
+ @Inject
+ public ClusterFromZKNoFailFast(@Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ // no fail fast
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
new file mode 100644
index 0000000..3a72050
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -0,0 +1,24 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.ClustersFromZK;
+import org.apache.zookeeper.Watcher.Event.KeeperState;
+
+import com.google.inject.Inject;
+import com.google.inject.name.Named;
+
+public class ClustersFromZKNoFailFast extends ClustersFromZK {
+
+ @Inject
+ public ClustersFromZKNoFailFast(@Named("cluster.name") String clusterName,
+ @Named("cluster.zk_address") String zookeeperAddress,
+ @Named("cluster.zk_session_timeout") int sessionTimeout,
+ @Named("cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ super(clusterName, zookeeperAddress, sessionTimeout, connectionTimeout);
+ }
+
+ @Override
+ public void handleStateChanged(KeeperState state) throws Exception {
+ // no fail fast
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
index 00a7611..91cd799 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -7,6 +7,8 @@
import junit.framework.Assert;
+import org.apache.s4.comm.DefaultCommModule;
+import org.apache.s4.core.DefaultCoreModule;
import org.apache.s4.core.Main;
import org.gradle.tooling.BuildLauncher;
import org.gradle.tooling.GradleConnector;
@@ -15,6 +17,10 @@
import sun.net.ProgressListener;
import com.google.common.io.PatternFilenameFilter;
+import com.google.common.io.Resources;
+import com.google.inject.Guice;
+import com.google.inject.Injector;
+import com.google.inject.util.Modules;
/**
* Contains static methods that can be used in tests for things such as: - files utilities: strings <-> files
@@ -88,4 +94,11 @@
}
}
+
+ public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
+ return Guice.createInjector(Modules.override(
+ new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream(), "cluster1"),
+ new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
+ new NonFailFastZookeeperClientsModule()));
+ }
}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java
new file mode 100644
index 0000000..790d591
--- /dev/null
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/fixtures/NonFailFastZookeeperClientsModule.java
@@ -0,0 +1,39 @@
+package org.apache.s4.fixtures;
+
+import org.apache.s4.comm.topology.Assignment;
+import org.apache.s4.comm.topology.Cluster;
+import org.apache.s4.comm.topology.Clusters;
+
+import com.google.inject.AbstractModule;
+
+/**
+ *
+ * Used for injecting non-fail-fast zookeeper client classes.
+ *
+ * Here is why:
+ *
+ * <ul>
+ * <li>tests contained in a single junit class are not forked: forking is on a class basis</li>
+ * <li>zookeeper client classes are injected during the tests</li>
+ * <li>zookeeper server is restarted between test methods.</li>
+ * <li>zookeeper client classes from previous tests methods get a "expired" exception upon reconnection to the new
+ * zookeeper instance. With a fail-fast implementation, this would kill the current test.</li>
+ * </ul>
+ *
+ *
+ */
+public class NonFailFastZookeeperClientsModule extends AbstractModule {
+
+ public NonFailFastZookeeperClientsModule() {
+ }
+
+ @Override
+ protected void configure() {
+ bind(Assignment.class).to(AssignmentFromZKNoFailFast.class);
+ bind(Cluster.class).to(ClusterFromZKNoFailFast.class);
+
+ bind(Clusters.class).to(ClustersFromZKNoFailFast.class);
+
+ }
+
+}
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
index dffe6cb..107f271 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordClassifierPE.java
@@ -12,6 +12,7 @@
import org.apache.s4.core.ProcessingElement;
import org.apache.s4.fixtures.CommTestUtils;
import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
@@ -62,9 +63,14 @@
for (Entry<String, Integer> entry : entrySet) {
sb.append(entry.getKey() + "=" + entry.getValue() + ";");
}
- CommTestUtils.writeStringToFile(sb.toString(), results);
- zk.create("/textProcessed", new byte[0], Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+ try {
+ zk.delete("/results", -1);
+ } catch (NoNodeException ignored) {
+ }
+
+ zk.create("/results", sb.toString().getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
} else {
// NOTE: this will fail if we did not recover the latest
// counter,
diff --git a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
index 5987e4a..0484b57 100644
--- a/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
+++ b/subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -1,6 +1,5 @@
package org.apache.s4.wordcount;
-import java.io.File;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -80,7 +79,7 @@
"-extraModulesClasses=" + WordCountModule.class.getName() });
CountDownLatch signalTextProcessed = new CountDownLatch(1);
- CommTestUtils.watchAndSignalCreation("/textProcessed", signalTextProcessed, zk);
+ CommTestUtils.watchAndSignalCreation("/results", signalTextProcessed, zk);
// add authorizations for processing
for (int i = 1; i <= SENTENCE_1_TOTAL_WORDS + SENTENCE_2_TOTAL_WORDS + 1; i++) {
@@ -90,10 +89,8 @@
injectSentence(SENTENCE_2);
injectSentence(SENTENCE_3);
Assert.assertTrue(signalTextProcessed.await(10, TimeUnit.SECONDS));
- File results = new File(CommTestUtils.DEFAULT_TEST_OUTPUT_DIR + File.separator + "wordcount");
- String s = CommTestUtils.readFile(results);
- Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", s);
-
+ String results = new String(zk.getData("/results", false, null));
+ Assert.assertEquals("be=2;da=2;doobie=5;not=1;or=1;to=2;", results);
}
public void injectSentence(String sentence) throws IOException {