ZOOKEEPER-4736: Fix nio socket fd leak if network service is down
Reviewers: kezhuw, anmolnar
Author: lchqlchq
Closes #2047 from lchqlchq/fd
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index ed03359..7663e27 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -1289,6 +1289,17 @@ public void run() {
"SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
}
+ private void abortConnection() {
+ try {
+ clientCnxnSocket.testableCloseSocket();
+ } catch (IOException e) {
+ LOG.debug("Fail to close ongoing socket", e);
+ }
+ }
+
+ /**
+ * This is not thread-safe and should only be called inside {@link SendThread}.
+ */
private void cleanAndNotifyState() {
cleanup();
if (state.isAlive()) {
@@ -1531,7 +1542,7 @@ public ReplyHeader submitRequest(
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
- sendThread.cleanAndNotifyState();
+ sendThread.abortConnection();
}
return r;
}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
index ea58b85..e39bee1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -209,6 +209,12 @@ void cleanup() {
} catch (IOException e) {
LOG.debug("Ignoring exception during channel close", e);
}
+ try {
+ selector.wakeup();
+ selector.selectNow();
+ } catch (IOException e) {
+ LOG.debug("Ignoring exception during selecting of cancelled socket", e);
+ }
}
try {
Thread.sleep(100);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 54426f0..2b70a59 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -18,20 +18,32 @@
package org.apache.zookeeper;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.BusyServer;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
@@ -76,6 +88,40 @@ private void closeZookeeper(ZooKeeper zk) {
}
@Test
+ public void testSocketClosedAfterFailure() throws Exception {
+ Duration sessionTimeout = Duration.ofMillis(1000);
+ final AtomicReference<Selector> nioSelector = new AtomicReference<>();
+ try (
+ // given: busy server
+ BusyServer server = new BusyServer();
+ ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int) sessionTimeout.toMillis(), null) {
+ @Override
+ ClientCnxn createConnection(HostProvider hostProvider, int sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) throws IOException {
+ ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO) clientCnxnSocket);
+
+ doAnswer(mock -> {
+ SocketChannel spy = spy((SocketChannel) mock.callRealMethod());
+ // when: connect get exception
+ //
+ // this could happen if system's network service is unavailable,
+ // for examples, "ifdown eth0" or "service network stop" and so on.
+ doThrow(new SocketException("Network is unreachable")).when(spy).connect(any());
+ return spy;
+ }).when(socket).createSock();
+
+ nioSelector.set(socket.getSelector());
+ return super.createConnection(hostProvider, sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd, canBeReadOnly);
+ }
+ }) {
+
+ Thread.sleep(sessionTimeout.toMillis() * 5);
+
+ // then: sockets of failed connections are closed, so at most one registered socket
+ assertThat(nioSelector.get().keys().size(), lessThanOrEqualTo(1));
+ }
+ }
+
+ @Test
public void testClientCnxnSocketFragility() throws Exception {
System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
FragileClientCnxnSocketNIO.class.getName());
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java b/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java
new file mode 100644
index 0000000..c2eece3
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class BusyServer implements AutoCloseable {
+ private final ServerSocket server;
+ private final Socket client;
+
+ public BusyServer() throws IOException {
+ this.server = new ServerSocket(0, 1, InetAddress.getByName("127.0.0.1"));
+ this.client = new Socket("127.0.0.1", server.getLocalPort());
+ }
+
+ public int getLocalPort() {
+ return server.getLocalPort();
+ }
+
+ public String getHostPort() {
+ return String.format("127.0.0.1:%d", getLocalPort());
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ server.close();
+ }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
index 9f5943f..86659ba 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
@@ -27,8 +27,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -42,6 +40,7 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.BusyServer;
import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -75,30 +74,6 @@ public synchronized void process(WatchedEvent event) {
}
}
- private static class BusyServer implements AutoCloseable {
- private final ServerSocket server;
- private final Socket client;
-
- public BusyServer() throws IOException {
- this.server = new ServerSocket(0, 1);
- this.client = new Socket("127.0.0.1", server.getLocalPort());
- }
-
- public int getLocalPort() {
- return server.getLocalPort();
- }
-
- public String getHostPort() {
- return String.format("127.0.0.1:%d", getLocalPort());
- }
-
- @Override
- public void close() throws Exception {
- client.close();
- server.close();
- }
- }
-
@Test
public void testSessionExpiration() throws InterruptedException, KeeperException {
final CountDownLatch expirationLatch = new CountDownLatch(1);