QPID-7368: Wait for cancellation of accept selection key before closing the socket
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
index b0e40bb..9828d46 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
@@ -22,16 +22,20 @@
 
 import java.io.IOException;
 import java.nio.channels.ServerSocketChannel;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 
 public class NetworkConnectionScheduler
@@ -204,7 +208,31 @@
 
     public void cancelAcceptingSocket(final ServerSocketChannel serverSocket)
     {
-        _selectorThread.cancelAcceptingSocket(serverSocket);
+        Future<Void> result = cancelAcceptingSocketAsync(serverSocket);
+        try
+        {
+            result.get(Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
+                                          CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT),
+                       TimeUnit.MILLISECONDS);
+        }
+        catch (InterruptedException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket was interrupted");
+            Thread.currentThread().interrupt();
+        }
+        catch (ExecutionException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket failed", e.getCause());
+        }
+        catch (TimeoutException e)
+        {
+            LOGGER.warn("Cancellation of accepting socket timed out");
+        }
+    }
+
+    private Future<Void> cancelAcceptingSocketAsync(final ServerSocketChannel serverSocket)
+    {
+        return _selectorThread.cancelAcceptingSocket(serverSocket);
     }
 
     public void addConnection(final NonBlockingConnection connection)
diff --git a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
index f8f5f0a..04ee506 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
@@ -36,10 +36,12 @@
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import com.google.common.util.concurrent.SettableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -431,8 +433,9 @@
         _selectionTasks[0].wakeup();
     }
 
-    public void cancelAcceptingSocket(final ServerSocketChannel socketChannel)
+    public Future<Void> cancelAcceptingSocket(final ServerSocketChannel socketChannel)
     {
+        final SettableFuture<Void> cancellationResult = SettableFuture.create();
         _tasks.add(new Runnable()
         {
             @Override
@@ -443,14 +446,33 @@
                     LOGGER.debug("Cancelling selector on accepting port {} ",
                                  socketChannel.socket().getLocalSocketAddress());
                 }
-                SelectionKey selectionKey = socketChannel.keyFor(_selectionTasks[0].getSelector());
-                if (selectionKey != null)
+
+                try
                 {
-                    selectionKey.cancel();
+                    SelectionKey selectionKey = null;
+                    try
+                    {
+                        selectionKey = socketChannel.register(_selectionTasks[0].getSelector(), 0);
+                    }
+                    catch (ClosedChannelException e)
+                    {
+                        LOGGER.error("Failed to deregister selector on accepting port {}",
+                                     socketChannel.socket().getLocalSocketAddress(), e);
+                    }
+
+                    if (selectionKey != null)
+                    {
+                        selectionKey.cancel();
+                    }
+                }
+                finally
+                {
+                    cancellationResult.set(null);
                 }
             }
         });
         _selectionTasks[0].wakeup();
+        return cancellationResult;
     }
 
     @Override