Fail fast when Zookeeper connections expire
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 3a198bb..66c2eca 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -118,11 +118,11 @@
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
-        if (!state.equals(KeeperState.SyncConnected)) {
-            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
-            zkClient.close();
-            zkClient.connect(connectionTimeout, null);
-            handleNewSession();
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterName);
+            System.exit(1);
         }
     }
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 82e7a5e..1ec3cb5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -186,7 +186,12 @@
 
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
-        // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterRef.get().toString());
+            System.exit(1);
+        }
     }
 
     @Override
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index c5445d3..f177599 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -78,10 +78,11 @@
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
-        if (!state.equals(KeeperState.SyncConnected)) {
-            logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
-            zkClient.connect(connectionTimeout, null);
-            handleNewSession();
+        if (state.equals(KeeperState.Expired)) {
+            logger.error(
+                    "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+                    clusterName);
+            System.exit(1);
         }
     }
 
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index dd93682..587f823 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -114,6 +114,11 @@
     @Override
     public void handleStateChanged(KeeperState state) throws Exception {
         this.state = state;
+        if (state.equals(KeeperState.Expired)) {
+            logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+            System.exit(1);
+        }
+
     }
 
     @Override
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
similarity index 80%
rename from subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
rename to subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 673c7a3..8e4c70a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -13,7 +13,12 @@
 import com.google.common.base.Splitter;
 import com.google.common.collect.Sets;
 
-public class AssignmentsFromZKTest extends ZKBaseTest {
+/**
+ * Separated from AssignmentsFromZKTest2 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ * 
+ */
+public class AssignmentsFromZKTest1 extends ZKBaseTest {
 
     @Test
     public void testAssignmentFor1Cluster() throws Exception {
@@ -22,15 +27,7 @@
         testAssignment(taskSetup, topologyNames);
     }
 
-    @Test
-    public void testAssignmentFor2Clusters() throws Exception {
-        Thread.sleep(2000);
-        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
-        final String topologyNames = "cluster2, cluster3";
-        testAssignment(taskSetup, topologyNames);
-    }
-
-    private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+    public static void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
         final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
         taskSetup.clean("s4");
         for (String topologyName : names) {
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
new file mode 100644
index 0000000..8c51c39
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.topology;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+/**
+ * Separated from AssignmentsFromZKTest1 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ * 
+ */
+public class AssignmentsFromZKTest2 extends ZkBasedTest {
+
+    @Test
+    public void testAssignmentFor2Clusters() throws Exception {
+        Thread.sleep(2000);
+        TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+        final String topologyNames = "cluster2, cluster3";
+        AssignmentsFromZKTest1.testAssignment(taskSetup, topologyNames);
+    }
+
+}