Bug fix: #disconnected method of protocol handlers is to be called only once the underlying I/O session has been fully closed
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 9200ba3..b419799 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -41,6 +41,7 @@
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
+import org.apache.hc.core5.function.Callback;
 import org.apache.hc.core5.io.CloseMode;
 import org.apache.hc.core5.io.Closer;
 import org.apache.hc.core5.util.Args;
@@ -57,6 +58,7 @@ class IOSessionImpl implements IOSession {
     private final Lock lock;
     private final String id;
     private final AtomicReference<IOEventHandler> handlerRef;
+    private final Callback<IOSession> sessionClosedCallback;
     private final AtomicReference<IOSession.Status> status;
 
     private volatile Timeout socketTimeout;
@@ -64,10 +66,12 @@ class IOSessionImpl implements IOSession {
     private volatile long lastWriteTime;
     private volatile long lastEventTime;
 
-    public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
+    public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel,
+                         final Callback<IOSession> sessionClosedCallback) {
         super();
         this.key = Args.notNull(key, "Selection key");
         this.channel = Args.notNull(socketChannel, "Socket channel");
+        this.sessionClosedCallback = sessionClosedCallback;
         this.commandQueue = new ConcurrentLinkedDeque<>();
         this.lock = new ReentrantLock();
         this.socketTimeout = Timeout.INFINITE;
@@ -264,28 +268,34 @@ public void close() {
     @Override
     public void close(final CloseMode closeMode) {
         if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
-            if (closeMode == CloseMode.IMMEDIATE) {
-                try {
-                    this.channel.setOption(StandardSocketOptions.SO_LINGER, 0);
-                } catch (final UnsupportedOperationException | IOException e) {
-                    // Quietly ignore
-                }
-            } else {
-                try {
-                    this.channel.shutdownOutput();
-                } catch (final IOException ignore) {
-                }
-            }
-            lock.lock();
             try {
-                this.key.cancel();
-                this.key.attach(null);
+                if (closeMode == CloseMode.IMMEDIATE) {
+                    try {
+                        this.channel.setOption(StandardSocketOptions.SO_LINGER, 0);
+                    } catch (final UnsupportedOperationException | IOException e) {
+                        // Quietly ignore
+                    }
+                } else {
+                    try {
+                        this.channel.shutdownOutput();
+                    } catch (final IOException ignore) {
+                    }
+                }
+                lock.lock();
+                try {
+                    this.key.cancel();
+                    this.key.attach(null);
+                } finally {
+                    lock.unlock();
+                }
+                Closer.closeQuietly(this.key.channel());
+                if (this.key.selector().isOpen()) {
+                    this.key.selector().wakeup();
+                }
             } finally {
-                lock.unlock();
-            }
-            Closer.closeQuietly(this.key.channel());
-            if (this.key.selector().isOpen()) {
-                this.key.selector().wakeup();
+                if (sessionClosedCallback != null) {
+                    sessionClosedCallback.execute(this);
+                }
             }
         }
     }
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index d290939..d0bd866 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -33,7 +33,6 @@
 import java.nio.channels.ByteChannel;
 import java.nio.channels.SelectionKey;
 import java.util.Objects;
-import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +65,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
     private final NamedEndpoint initialEndpoint;
     private final Decorator<IOSession> ioSessionDecorator;
     private final IOSessionListener sessionListener;
-    private final Queue<InternalDataChannel> closedSessions;
     private final AtomicReference<SSLIOSession> tlsSessionRef;
     private final AtomicReference<IOSession> currentSessionRef;
     private final AtomicReference<IOEventHandler> eventHandlerRef;
@@ -77,11 +75,9 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
             final IOSession ioSession,
             final NamedEndpoint initialEndpoint,
             final Decorator<IOSession> ioSessionDecorator,
-            final IOSessionListener sessionListener,
-            final Queue<InternalDataChannel> closedSessions) {
+            final IOSessionListener sessionListener) {
         this.ioSession = ioSession;
         this.initialEndpoint = initialEndpoint;
-        this.closedSessions = closedSessions;
         this.ioSessionDecorator = ioSessionDecorator;
         this.sessionListener = sessionListener;
         this.tlsSessionRef = new AtomicReference<>();
@@ -189,23 +185,6 @@ void onTLSSessionStart(final SSLIOSession sslSession) {
         }
     }
 
-    void onTLSSessionEnd(final SSLIOSession sslSession) {
-        if (closed.compareAndSet(false, true)) {
-            closedSessions.add(this);
-        }
-    }
-
-    void disconnected() {
-        final IOSession currentSession = currentSessionRef.get();
-        if (sessionListener != null) {
-            sessionListener.disconnected(currentSession);
-        }
-        final IOEventHandler handler = currentSession.getHandler();
-        if (handler != null) {
-            handler.disconnected(currentSession);
-        }
-    }
-
     @Override
     public void startTls(
             final SSLContext sslContext,
@@ -236,7 +215,7 @@ public void startTls(
                 verifier,
                 handshakeTimeout,
                 this::onTLSSessionStart,
-                this::onTLSSessionEnd,
+                null,
                 new CallbackContribution<SSLSession>(callback) {
 
                     @Override
@@ -281,18 +260,11 @@ public void close() {
 
     @Override
     public void close(final CloseMode closeMode) {
-        final IOSession currentSession = currentSessionRef.get();
         if (closeMode == CloseMode.IMMEDIATE) {
-            closed.set(true);
-            currentSession.close(closeMode);
+            ioSession.close(closeMode);
         } else {
-            if (closed.compareAndSet(false, true)) {
-                try {
-                    currentSession.close(closeMode);
-                } finally {
-                    closedSessions.add(this);
-                }
-            }
+            final IOSession currentSession = currentSessionRef.get();
+            currentSession.close(closeMode);
         }
     }
 
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index 7510c26..b5b156e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -67,7 +67,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
     private final Decorator<IOSession> ioSessionDecorator;
     private final IOSessionListener sessionListener;
     private final Callback<IOSession> sessionShutdownCallback;
-    private final Queue<InternalDataChannel> closedSessions;
+    private final Queue<IOSession> closedSessions;
     private final Queue<ChannelEntry> channelQueue;
     private final Queue<IOSessionRequest> requestQueue;
     private final AtomicBoolean shutdownInitiated;
@@ -221,13 +221,12 @@ private void processPendingChannels() throws IOException {
             } catch (final ClosedChannelException ex) {
                 return;
             }
-            final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
+            final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add);
             final InternalDataChannel dataChannel = new InternalDataChannel(
                     ioSession,
                     null,
                     ioSessionDecorator,
-                    sessionListener,
-                    closedSessions);
+                    sessionListener);
             dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
             dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
             key.attach(dataChannel);
@@ -237,12 +236,18 @@ private void processPendingChannels() throws IOException {
 
     private void processClosedSessions() {
         for (;;) {
-            final InternalDataChannel dataChannel = this.closedSessions.poll();
-            if (dataChannel == null) {
+            final IOSession ioSession = this.closedSessions.poll();
+            if (ioSession == null) {
                 break;
             }
             try {
-                dataChannel.disconnected();
+                if (sessionListener != null) {
+                    sessionListener.disconnected(ioSession);
+                }
+                final IOEventHandler handler = ioSession.getHandler();
+                if (handler != null) {
+                    handler.disconnected(ioSession);
+                }
             } catch (final CancelledKeyException ex) {
                 // ignore and move on
             }
@@ -385,13 +390,12 @@ private void processConnectionRequest(final SocketChannel socketChannel, final I
         validateAddress(remoteAddress);
         final boolean connected = socketChannel.connect(remoteAddress);
         final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
-        final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
+        final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add);
         final InternalDataChannel dataChannel = new InternalDataChannel(
                 ioSession,
                 sessionRequest.remoteEndpoint,
                 ioSessionDecorator,
-                sessionListener,
-                closedSessions);
+                sessionListener);
         dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
         final InternalChannel connectChannel = new InternalConnectChannel(
                 key,