HADOOP-19261. Support force close a DomainSocket for server service (#7057)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
index 73fff03..3edd349 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocket.java
@@ -339,10 +339,13 @@ private static native void closeFileDescriptor0(FileDescriptor fd)
private static native void shutdown0(int fd) throws IOException;
/**
- * Close the Socket.
+ * Close the Server Socket without check refCount.
+ * When Server Socket is blocked on accept(), its refCount is 1.
+ * close() call on Server Socket will be stuck in the while loop count check.
+ * @param force if true, will not check refCount before close socket.
+ * @throws IOException raised on errors performing I/O.
*/
- @Override
- public void close() throws IOException {
+ public void close(boolean force) throws IOException {
// Set the closed bit on this DomainSocket
int count;
try {
@@ -351,41 +354,61 @@ public void close() throws IOException {
// Someone else already closed the DomainSocket.
return;
}
- // Wait for all references to go away
- boolean didShutdown = false;
+
boolean interrupted = false;
- while (count > 0) {
- if (!didShutdown) {
- try {
- // Calling shutdown on the socket will interrupt blocking system
- // calls like accept, write, and read that are going on in a
- // different thread.
- shutdown0(fd);
- } catch (IOException e) {
- LOG.error("shutdown error: ", e);
- }
- didShutdown = true;
- }
+ if (force) {
try {
- Thread.sleep(10);
- } catch (InterruptedException e) {
- interrupted = true;
+ // Calling shutdown on the socket will interrupt blocking system
+ // calls like accept, write, and read that are going on in a
+ // different thread.
+ shutdown0(fd);
+ } catch (IOException e) {
+ LOG.error("shutdown error: ", e);
}
- count = refCount.getReferenceCount();
+ } else {
+ // Wait for all references to go away
+ boolean didShutdown = false;
+ while (count > 0) {
+ if (!didShutdown) {
+ try {
+ // Calling shutdown on the socket will interrupt blocking system
+ // calls like accept, write, and read that are going on in a
+ // different thread.
+ shutdown0(fd);
+ } catch (IOException e) {
+ LOG.error("shutdown error: ", e);
+ }
+ didShutdown = true;
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ count = refCount.getReferenceCount();
+ }
}
- // At this point, nobody has a reference to the file descriptor,
+ // At this point, nobody has a reference to the file descriptor,
// and nobody will be able to get one in the future either.
// We now call close(2) on the file descriptor.
- // After this point, the file descriptor number will be reused by
- // something else. Although this DomainSocket object continues to hold
- // the old file descriptor number (it's a final field), we never use it
+ // After this point, the file descriptor number will be reused by
+ // something else. Although this DomainSocket object continues to hold
+ // the old file descriptor number (it's a final field), we never use it
// again because this DomainSocket is closed.
close0(fd);
if (interrupted) {
Thread.currentThread().interrupt();
}
}
+
+ /**
+ * Close the Socket.
+ */
+ @Override
+ public void close() throws IOException {
+ close(false);
+ }
/**
* Call shutdown(SHUT_RDWR) on the UNIX domain socket.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
index c00b4b2..40399f0 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TemporarySocketDirectory.java
@@ -20,7 +20,6 @@
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
-import java.util.Random;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileUtil;
@@ -35,8 +34,7 @@ public class TemporarySocketDirectory implements Closeable {
public TemporarySocketDirectory() {
String tmp = System.getProperty("java.io.tmpdir", "/tmp");
- dir = new File(tmp, "socks." + (System.currentTimeMillis() +
- "." + (new Random().nextInt())));
+ dir = new File(tmp, "socks." + System.nanoTime());
dir.mkdirs();
FileUtil.setWritable(dir, true);
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
index 61cbd85..952f2b3 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/net/unix/TestDomainSocket.java
@@ -130,7 +130,7 @@ public Void call(){
DomainSocket conn = DomainSocket.connect(serv.getPath());
Thread.sleep(50);
conn.close();
- serv.close();
+ serv.close(true);
future.get(2, TimeUnit.MINUTES);
}
@@ -161,7 +161,7 @@ public Void call(){
};
Future<Void> future = exeServ.submit(callable);
Thread.sleep(500);
- serv.close();
+ serv.close(true);
future.get(2, TimeUnit.MINUTES);
}
@@ -240,7 +240,7 @@ public Void call(){
Future<Void> clientFuture = exeServ.submit(clientCallable);
Thread.sleep(500);
clientConn.close();
- serv.close();
+ serv.close(true);
clientFuture.get(2, TimeUnit.MINUTES);
serverFuture.get(2, TimeUnit.MINUTES);
}
@@ -281,28 +281,39 @@ public void testServerOptions() throws Exception {
final String TEST_PATH = new File(sockDir.getDir(),
"test_sock_server_options").getAbsolutePath();
DomainSocket serv = DomainSocket.bindAndListen(TEST_PATH);
- try {
- // Let's set a new receive buffer size
- int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
- int newBufSize = bufSize / 2;
- serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
- int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
- Assert.assertEquals(newBufSize, nextBufSize);
- // Let's set a server timeout
- int newTimeout = 1000;
- serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
- int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
- Assert.assertEquals(newTimeout, nextTimeout);
- try {
- serv.accept();
- Assert.fail("expected the accept() to time out and fail");
- } catch (SocketTimeoutException e) {
- GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+ // Let's set a new receive buffer size
+ int bufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+ int newBufSize = bufSize / 2;
+ serv.setAttribute(DomainSocket.RECEIVE_BUFFER_SIZE, newBufSize);
+ int nextBufSize = serv.getAttribute(DomainSocket.RECEIVE_BUFFER_SIZE);
+ Assert.assertEquals(newBufSize, nextBufSize);
+ // Let's set a server timeout
+ int newTimeout = 1000;
+ serv.setAttribute(DomainSocket.RECEIVE_TIMEOUT, newTimeout);
+ int nextTimeout = serv.getAttribute(DomainSocket.RECEIVE_TIMEOUT);
+ Assert.assertEquals(newTimeout, nextTimeout);
+
+ ExecutorService exeServ = Executors.newSingleThreadExecutor();
+ Callable<Void> callable = new Callable<Void>() {
+ public Void call() {
+ try {
+ serv.accept();
+ Assert.fail("expected the accept() to time out and fail");
+ } catch (SocketTimeoutException e) {
+ GenericTestUtils.assertExceptionContains("accept(2) error: ", e);
+ } catch (AsynchronousCloseException e) {
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException("unexpected IOException", e);
+ }
+ return null;
}
- } finally {
- serv.close();
- Assert.assertFalse(serv.isOpen());
- }
+ };
+ Future<Void> future = exeServ.submit(callable);
+ Thread.sleep(500);
+ serv.close(true);
+ future.get();
+ Assert.assertFalse(serv.isOpen());
}
/**
@@ -656,7 +667,7 @@ public void run(){
}
serverThread.join(120000);
clientThread.join(120000);
- serv.close();
+ serv.close(true);
for (PassedFile pf : passedFiles) {
pf.cleanup();
}