Fail fast when Zookeeper connections expire
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
index 3a198bb..66c2eca 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
@@ -118,11 +118,11 @@
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.close();
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
index 82e7a5e..1ec3cb5 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
@@ -186,7 +186,12 @@
@Override
public void handleStateChanged(KeeperState state) throws Exception {
- // TODO we should reconnect only if we hold the zookeeper connection (i.e. this is the local cluster)
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterRef.get().toString());
+ System.exit(1);
+ }
}
@Override
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
index c5445d3..f177599 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
@@ -78,10 +78,11 @@
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
- if (!state.equals(KeeperState.SyncConnected)) {
- logger.warn("Session not connected for cluster [{}]: [{}]. Trying to reconnect", clusterName, state.name());
- zkClient.connect(connectionTimeout, null);
- handleNewSession();
+ if (state.equals(KeeperState.Expired)) {
+ logger.error(
+ "Zookeeper session expired, possibly due to a network partition for cluster [{}]. This node is considered as dead by Zookeeper. Proceeding to stop this node.",
+ clusterName);
+ System.exit(1);
}
}
diff --git a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
index dd93682..587f823 100644
--- a/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
+++ b/subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
@@ -114,6 +114,11 @@
@Override
public void handleStateChanged(KeeperState state) throws Exception {
this.state = state;
+ if (state.equals(KeeperState.Expired)) {
+ logger.error("Zookeeper session expired, possibly due to a network partition. This node is considered as dead by Zookeeper. Proceeding to stop this node.");
+ System.exit(1);
+ }
+
}
@Override
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
similarity index 80%
rename from subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
rename to subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
index 673c7a3..8e4c70a 100644
--- a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest.java
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
@@ -13,7 +13,12 @@
import com.google.common.base.Splitter;
import com.google.common.collect.Sets;
-public class AssignmentsFromZKTest extends ZKBaseTest {
+/**
+ * Separated from AssignmentsFromZKTest2 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest1 extends ZKBaseTest {
@Test
public void testAssignmentFor1Cluster() throws Exception {
@@ -22,15 +27,7 @@
testAssignment(taskSetup, topologyNames);
}
- @Test
- public void testAssignmentFor2Clusters() throws Exception {
- Thread.sleep(2000);
- TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
- final String topologyNames = "cluster2, cluster3";
- testAssignment(taskSetup, topologyNames);
- }
-
- private void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
+ public static void testAssignment(TaskSetup taskSetup, final String topologyNames) throws InterruptedException {
final Set<String> names = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(topologyNames));
taskSetup.clean("s4");
for (String topologyName : names) {
diff --git a/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
new file mode 100644
index 0000000..8c51c39
--- /dev/null
+++ b/subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest2.java
@@ -0,0 +1,23 @@
+package org.apache.s4.comm.topology;
+
+import org.apache.s4.comm.tools.TaskSetup;
+import org.apache.s4.fixtures.CommTestUtils;
+import org.apache.s4.fixtures.ZkBasedTest;
+import org.junit.Test;
+
+/**
+ * Separated from AssignmentsFromZKTest1 so that VM exit upon Zookeeper connection expiration does not affect the test
+ * in that other class.
+ *
+ */
+public class AssignmentsFromZKTest2 extends ZkBasedTest {
+
+ @Test
+ public void testAssignmentFor2Clusters() throws Exception {
+ Thread.sleep(2000);
+ TaskSetup taskSetup = new TaskSetup(CommTestUtils.ZK_STRING);
+ final String topologyNames = "cluster2, cluster3";
+ AssignmentsFromZKTest1.testAssignment(taskSetup, topologyNames);
+ }
+
+}