QPID-7368: Wait for cancellation of accept selection key before closing the socket
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index b0e40bb..9828d46 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,16 +22,20 @@
import java.io.IOException;
import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
public class NetworkConnectionScheduler
@@ -204,7 +208,31 @@
public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
{
- _selectorThread.cancelAcceptingSocket(serverSocket);
+ Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
+ try
+ {
+ result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+ CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
+ TimeUnit.MILLISECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket was interrupted");
+ Thread.currentThread().interrupt();
+ }
+ catch (ExecutionException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+ }
+ catch (TimeoutException e)
+ {
+ LOGGER.warn("Cancellation of accepting socket timed out");
+ }
+ }
+
+ private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
+ {
+ return _selectorThread.cancelAcceptingSocket(serverSocket);
}
public void addConnection(final NonBlockingConnection connection)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index f8f5f0a..04ee506 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -36,10 +36,12 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.util.concurrent.SettableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -431,8 +433,9 @@
_selectionTasks[0].wakeup();
}
- public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+ public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
{
+ final SettableFuture<Void> cancellationResult = SettableFuture.create();
_tasks.add(new Runnable()
{
@Override
@@ -443,14 +446,33 @@
LOGGER.debug("Cancelling selector on accepting port {} ",
socketChannel.socket().getLocalSocketAddress());
}
- SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
- if (selectionKey != null)
+
+ try
{
- selectionKey.cancel();
+ SelectionKey selectionKey = null;
+ try
+ {
+ selectionKey = socketChannel.register(_selectionTasks[0].getSelector(), 0);
+ }
+ catch (ClosedChannelException e)
+ {
+ LOGGER.error("Failed to deregister selector on accepting port {}",
+ socketChannel.socket().getLocalSocketAddress(), e);
+ }
+
+ if (selectionKey != null)
+ {
+ selectionKey.cancel();
+ }
+ }
+ finally
+ {
+ cancellationResult.set(null);
}
}
});
_selectionTasks[0].wakeup();
+ return cancellationResult;
}
@Override