Bug fix: #disconnected method of protocol handlers is to be called only once the underlying I/O session has been fully closed
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
index 9200ba3..b419799 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/IOSessionImpl.java
@@ -41,6 +41,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.hc.core5.function.Callback;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.Closer;
import org.apache.hc.core5.util.Args;
@@ -57,6 +58,7 @@ class IOSessionImpl implements IOSession {
private final Lock lock;
private final String id;
private final AtomicReference<IOEventHandler> handlerRef;
+ private final Callback<IOSession> sessionClosedCallback;
private final AtomicReference<IOSession.Status> status;
private volatile Timeout socketTimeout;
@@ -64,10 +66,12 @@ class IOSessionImpl implements IOSession {
private volatile long lastWriteTime;
private volatile long lastEventTime;
- public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel) {
+ public IOSessionImpl(final String type, final SelectionKey key, final SocketChannel socketChannel,
+ final Callback<IOSession> sessionClosedCallback) {
super();
this.key = Args.notNull(key, "Selection key");
this.channel = Args.notNull(socketChannel, "Socket channel");
+ this.sessionClosedCallback = sessionClosedCallback;
this.commandQueue = new ConcurrentLinkedDeque<>();
this.lock = new ReentrantLock();
this.socketTimeout = Timeout.INFINITE;
@@ -264,28 +268,34 @@ public void close() {
@Override
public void close(final CloseMode closeMode) {
if (this.status.compareAndSet(Status.ACTIVE, Status.CLOSED)) {
- if (closeMode == CloseMode.IMMEDIATE) {
- try {
- this.channel.setOption(StandardSocketOptions.SO_LINGER, 0);
- } catch (final UnsupportedOperationException | IOException e) {
- // Quietly ignore
- }
- } else {
- try {
- this.channel.shutdownOutput();
- } catch (final IOException ignore) {
- }
- }
- lock.lock();
try {
- this.key.cancel();
- this.key.attach(null);
+ if (closeMode == CloseMode.IMMEDIATE) {
+ try {
+ this.channel.setOption(StandardSocketOptions.SO_LINGER, 0);
+ } catch (final UnsupportedOperationException | IOException e) {
+ // Quietly ignore
+ }
+ } else {
+ try {
+ this.channel.shutdownOutput();
+ } catch (final IOException ignore) {
+ }
+ }
+ lock.lock();
+ try {
+ this.key.cancel();
+ this.key.attach(null);
+ } finally {
+ lock.unlock();
+ }
+ Closer.closeQuietly(this.key.channel());
+ if (this.key.selector().isOpen()) {
+ this.key.selector().wakeup();
+ }
} finally {
- lock.unlock();
- }
- Closer.closeQuietly(this.key.channel());
- if (this.key.selector().isOpen()) {
- this.key.selector().wakeup();
+ if (sessionClosedCallback != null) {
+ sessionClosedCallback.execute(this);
+ }
}
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index d290939..d0bd866 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -33,7 +33,6 @@
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.util.Objects;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -66,7 +65,6 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
private final NamedEndpoint initialEndpoint;
private final Decorator<IOSession> ioSessionDecorator;
private final IOSessionListener sessionListener;
- private final Queue<InternalDataChannel> closedSessions;
private final AtomicReference<SSLIOSession> tlsSessionRef;
private final AtomicReference<IOSession> currentSessionRef;
private final AtomicReference<IOEventHandler> eventHandlerRef;
@@ -77,11 +75,9 @@ final class InternalDataChannel extends InternalChannel implements ProtocolIOSes
final IOSession ioSession,
final NamedEndpoint initialEndpoint,
final Decorator<IOSession> ioSessionDecorator,
- final IOSessionListener sessionListener,
- final Queue<InternalDataChannel> closedSessions) {
+ final IOSessionListener sessionListener) {
this.ioSession = ioSession;
this.initialEndpoint = initialEndpoint;
- this.closedSessions = closedSessions;
this.ioSessionDecorator = ioSessionDecorator;
this.sessionListener = sessionListener;
this.tlsSessionRef = new AtomicReference<>();
@@ -189,23 +185,6 @@ void onTLSSessionStart(final SSLIOSession sslSession) {
}
}
- void onTLSSessionEnd(final SSLIOSession sslSession) {
- if (closed.compareAndSet(false, true)) {
- closedSessions.add(this);
- }
- }
-
- void disconnected() {
- final IOSession currentSession = currentSessionRef.get();
- if (sessionListener != null) {
- sessionListener.disconnected(currentSession);
- }
- final IOEventHandler handler = currentSession.getHandler();
- if (handler != null) {
- handler.disconnected(currentSession);
- }
- }
-
@Override
public void startTls(
final SSLContext sslContext,
@@ -236,7 +215,7 @@ public void startTls(
verifier,
handshakeTimeout,
this::onTLSSessionStart,
- this::onTLSSessionEnd,
+ null,
new CallbackContribution<SSLSession>(callback) {
@Override
@@ -281,18 +260,11 @@ public void close() {
@Override
public void close(final CloseMode closeMode) {
- final IOSession currentSession = currentSessionRef.get();
if (closeMode == CloseMode.IMMEDIATE) {
- closed.set(true);
- currentSession.close(closeMode);
+ ioSession.close(closeMode);
} else {
- if (closed.compareAndSet(false, true)) {
- try {
- currentSession.close(closeMode);
- } finally {
- closedSessions.add(this);
- }
- }
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.close(closeMode);
}
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index 7510c26..b5b156e 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -67,7 +67,7 @@ class SingleCoreIOReactor extends AbstractSingleCoreIOReactor implements Connect
private final Decorator<IOSession> ioSessionDecorator;
private final IOSessionListener sessionListener;
private final Callback<IOSession> sessionShutdownCallback;
- private final Queue<InternalDataChannel> closedSessions;
+ private final Queue<IOSession> closedSessions;
private final Queue<ChannelEntry> channelQueue;
private final Queue<IOSessionRequest> requestQueue;
private final AtomicBoolean shutdownInitiated;
@@ -221,13 +221,12 @@ private void processPendingChannels() throws IOException {
} catch (final ClosedChannelException ex) {
return;
}
- final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
+ final IOSessionImpl ioSession = new IOSessionImpl("a", key, socketChannel, closedSessions::add);
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
null,
ioSessionDecorator,
- sessionListener,
- closedSessions);
+ sessionListener);
dataChannel.setSocketTimeout(this.reactorConfig.getSoTimeout());
dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
key.attach(dataChannel);
@@ -237,12 +236,18 @@ private void processPendingChannels() throws IOException {
private void processClosedSessions() {
for (;;) {
- final InternalDataChannel dataChannel = this.closedSessions.poll();
- if (dataChannel == null) {
+ final IOSession ioSession = this.closedSessions.poll();
+ if (ioSession == null) {
break;
}
try {
- dataChannel.disconnected();
+ if (sessionListener != null) {
+ sessionListener.disconnected(ioSession);
+ }
+ final IOEventHandler handler = ioSession.getHandler();
+ if (handler != null) {
+ handler.disconnected(ioSession);
+ }
} catch (final CancelledKeyException ex) {
// ignore and move on
}
@@ -385,13 +390,12 @@ private void processConnectionRequest(final SocketChannel socketChannel, final I
validateAddress(remoteAddress);
final boolean connected = socketChannel.connect(remoteAddress);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
- final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
+ final IOSessionImpl ioSession = new IOSessionImpl("c", key, socketChannel, closedSessions::add);
final InternalDataChannel dataChannel = new InternalDataChannel(
ioSession,
sessionRequest.remoteEndpoint,
ioSessionDecorator,
- sessionListener,
- closedSessions);
+ sessionListener);
dataChannel.setSocketTimeout(reactorConfig.getSoTimeout());
final InternalChannel connectChannel = new InternalConnectChannel(
key,