Fixed logging for TLS secured non-blocking connections
diff --git a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
index c7eebcc..fbb53a4 100644
--- a/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
+++ b/httpcore5-testing/src/main/java/org/apache/hc/core5/testing/nio/LoggingIOSession.java
@@ -35,12 +35,13 @@
import java.util.concurrent.locks.Lock;
import org.apache.hc.core5.annotation.Internal;
+import org.apache.hc.core5.http.Chars;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.IOEventHandler;
import org.apache.hc.core5.reactor.IOSession;
import org.apache.hc.core5.reactor.ProtocolIOSession;
-import org.apache.hc.core5.testing.classic.Wire;
+import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.Timeout;
import org.slf4j.Logger;
@@ -48,14 +49,14 @@
public class LoggingIOSession implements IOSession {
private final Logger log;
- private final Wire wireLog;
+ private final Logger wireLog;
private final IOSession session;
public LoggingIOSession(final IOSession session, final Logger log, final Logger wireLog) {
super();
this.session = session;
this.log = log;
- this.wireLog = wireLog != null ? new Wire(wireLog, session.getId()) : null;
+ this.wireLog = wireLog;
}
public LoggingIOSession(final ProtocolIOSession session, final Logger log) {
@@ -198,12 +199,12 @@
if (log.isDebugEnabled()) {
log.debug("{} {} bytes read", session, bytesRead);
}
- if (bytesRead > 0 && wireLog.isEnabled()) {
+ if (bytesRead > 0 && wireLog.isDebugEnabled()) {
final ByteBuffer b = dst.duplicate();
final int p = b.position();
b.limit(p);
b.position(p - bytesRead);
- wireLog.input(b);
+ logData(b, "<< ");
}
return bytesRead;
}
@@ -214,16 +215,52 @@
if (log.isDebugEnabled()) {
log.debug("{} {} bytes written", session, byteWritten);
}
- if (byteWritten > 0 && wireLog.isEnabled()) {
+ if (byteWritten > 0 && wireLog.isDebugEnabled()) {
final ByteBuffer b = src.duplicate();
final int p = b.position();
b.limit(p);
b.position(p - byteWritten);
- wireLog.output(b);
+ logData(b, ">> ");
}
return byteWritten;
}
+ private void logData(final ByteBuffer data, final String prefix) throws IOException {
+ final byte[] line = new byte[16];
+ final StringBuilder buf = new StringBuilder();
+ while (data.hasRemaining()) {
+ buf.setLength(0);
+ buf.append(prefix);
+ final int chunk = Math.min(data.remaining(), line.length);
+ data.get(line, 0, chunk);
+
+ for (int i = 0; i < chunk; i++) {
+ final char ch = (char) line[i];
+ if (ch > Chars.SP && ch <= Chars.DEL) {
+ buf.append(ch);
+ } else if (Character.isWhitespace(ch)) {
+ buf.append(' ');
+ } else {
+ buf.append('.');
+ }
+ }
+ for (int i = chunk; i < 17; i++) {
+ buf.append(' ');
+ }
+
+ for (int i = 0; i < chunk; i++) {
+ buf.append(' ');
+ final int b = line[i] & 0xff;
+ final String s = Integer.toHexString(b);
+ if (s.length() == 1) {
+ buf.append("0");
+ }
+ buf.append(s);
+ }
+ wireLog.debug(buf.toString());
+ }
+ }
+
@Override
public void updateReadTime() {
this.session.updateReadTime();
@@ -256,10 +293,48 @@
@Override
public void upgrade(final IOEventHandler handler) {
+ Args.notNull(handler, "Protocol handler");
if (this.log.isDebugEnabled()) {
- this.log.debug("{} protocol upgrade: {}", this.session, handler != null ? handler.getClass() : null);
+ this.log.debug("{} protocol upgrade: {}", this.session, handler.getClass());
}
- this.session.upgrade(handler);
+ this.session.upgrade(new IOEventHandler() {
+
+ @Override
+ public void connected(final IOSession protocolSession) throws IOException {
+ handler.connected(protocolSession);
+ }
+
+ @Override
+ public void inputReady(final IOSession protocolSession, final ByteBuffer src) throws IOException {
+ if (src != null && wireLog.isDebugEnabled()) {
+ final ByteBuffer b = src.duplicate();
+ logData(b, "<< ");
+ }
+ handler.inputReady(protocolSession, src);
+ }
+
+ @Override
+ public void outputReady(final IOSession protocolSession) throws IOException {
+ handler.outputReady(protocolSession);
+ }
+
+ @Override
+ public void timeout(final IOSession protocolSession, final Timeout timeout) throws IOException {
+ handler.timeout(protocolSession, timeout);
+ }
+
+ @Override
+ public void exception(final IOSession protocolSession, final Exception cause) {
+ handler.exception(protocolSession, cause);
+ }
+
+ @Override
+ public void disconnected(final IOSession protocolSession) {
+ handler.disconnected(protocolSession);
+ }
+
+ });
+
}
@Override
diff --git a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2IntegrationTest.java b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2IntegrationTest.java
index 68438d1..35b1a66 100644
--- a/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2IntegrationTest.java
+++ b/httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/H2IntegrationTest.java
@@ -68,7 +68,6 @@
import org.apache.hc.core5.http.EntityDetails;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HeaderElements;
-import org.apache.hc.core5.http.HttpConnection;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
@@ -109,6 +108,7 @@
import org.apache.hc.core5.http.nio.support.classic.AbstractClassicServerExchangeHandler;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
+import org.apache.hc.core5.http.protocol.HttpCoreContext;
import org.apache.hc.core5.http2.H2Error;
import org.apache.hc.core5.http2.H2StreamResetException;
import org.apache.hc.core5.http2.config.H2Config;
@@ -120,7 +120,6 @@
import org.apache.hc.core5.reactor.Command;
import org.apache.hc.core5.reactor.IOReactorConfig;
import org.apache.hc.core5.reactor.IOSession;
-import org.apache.hc.core5.reactor.ProtocolIOSession;
import org.apache.hc.core5.testing.SSLTestContexts;
import org.apache.hc.core5.util.TextUtils;
import org.apache.hc.core5.util.TimeValue;
@@ -1040,9 +1039,11 @@
final HttpRequest request = new BasicHttpRequest(Method.GET, createRequestURI(serverEndpoint, "/hello"));
request.addHeader(HttpHeaders.CONNECTION, HeaderElements.CLOSE);
+ final HttpCoreContext coreContext = HttpCoreContext.create();
final Future<Message<HttpResponse, String>> future = streamEndpoint.execute(
new BasicRequestProducer(request, null),
- new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null);
+ new BasicResponseConsumer<>(new StringAsyncEntityConsumer()),
+ coreContext, null);
try {
future.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
Assert.fail("ExecutionException is expected");
@@ -1050,8 +1051,7 @@
Assert.assertThat(ex.getCause(), CoreMatchers.instanceOf(ProtocolException.class));
}
- final HttpConnection eventHandler = (HttpConnection) ((ProtocolIOSession) session).getHandler();
- final EndpointDetails endpointDetails = eventHandler.getEndpointDetails();
+ final EndpointDetails endpointDetails = coreContext.getEndpointDetails();
Assert.assertThat(endpointDetails.getRequestCount(), CoreMatchers.equalTo(0L));
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
index 94571a8..8b85962 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/InternalDataChannel.java
@@ -40,6 +40,7 @@
import javax.net.ssl.SSLContext;
import org.apache.hc.core5.function.Callback;
+import org.apache.hc.core5.function.Decorator;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.net.NamedEndpoint;
import org.apache.hc.core5.reactor.ssl.SSLBufferMode;
@@ -55,21 +56,27 @@
private final IOSession ioSession;
private final NamedEndpoint initialEndpoint;
+ private final Decorator<IOSession> ioSessionDecorator;
private final IOSessionListener sessionListener;
- private final AtomicReference<SSLIOSession> tlsSessionRef;
private final Queue<InternalDataChannel> closedSessions;
+ private final AtomicReference<SSLIOSession> tlsSessionRef;
+ private final AtomicReference<IOSession> currentSessionRef;
private final AtomicBoolean closed;
InternalDataChannel(
final IOSession ioSession,
final NamedEndpoint initialEndpoint,
+ final Decorator<IOSession> ioSessionDecorator,
final IOSessionListener sessionListener,
final Queue<InternalDataChannel> closedSessions) {
this.ioSession = ioSession;
this.initialEndpoint = initialEndpoint;
this.closedSessions = closedSessions;
+ this.ioSessionDecorator = ioSessionDecorator;
this.sessionListener = sessionListener;
this.tlsSessionRef = new AtomicReference<>(null);
+ this.currentSessionRef = new AtomicReference<>(
+ ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession);
this.closed = new AtomicBoolean(false);
}
@@ -85,17 +92,14 @@
@Override
public IOEventHandler getHandler() {
- return ioSession.getHandler();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.getHandler();
}
@Override
public void upgrade(final IOEventHandler handler) {
- ioSession.upgrade(handler);
- }
-
- private IOSession getSessionImpl() {
- final SSLIOSession tlsSession = tlsSessionRef.get();
- return tlsSession != null ? tlsSession : ioSession;
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.upgrade(handler);
}
private IOEventHandler ensureHandler(final IOSession session) {
@@ -106,67 +110,70 @@
@Override
void onIOEvent(final int readyOps) throws IOException {
- final SSLIOSession tlsSession = tlsSessionRef.get();
- final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
+ final IOSession currentSession = currentSessionRef.get();
currentSession.clearEvent(SelectionKey.OP_CONNECT);
- if (tlsSession == null) {
+ if (tlsSessionRef.get() == null) {
if (sessionListener != null) {
- sessionListener.connected(this);
+ sessionListener.connected(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
- handler.connected(this);
+ handler.connected(currentSession);
}
}
if ((readyOps & SelectionKey.OP_READ) != 0) {
+ final IOSession currentSession = currentSessionRef.get();
currentSession.updateReadTime();
if (sessionListener != null) {
- sessionListener.inputReady(this);
+ sessionListener.inputReady(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
- handler.inputReady(this, null);
+ handler.inputReady(currentSession, null);
}
if ((readyOps & SelectionKey.OP_WRITE) != 0
|| (ioSession.getEventMask() & SelectionKey.OP_WRITE) != 0) {
+ final IOSession currentSession = currentSessionRef.get();
currentSession.updateWriteTime();
if (sessionListener != null) {
- sessionListener.outputReady(this);
+ sessionListener.outputReady(currentSession);
}
final IOEventHandler handler = ensureHandler(currentSession);
- handler.outputReady(this);
+ handler.outputReady(currentSession);
}
}
@Override
Timeout getTimeout() {
- return ioSession.getSocketTimeout();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.getSocketTimeout();
}
@Override
void onTimeout(final Timeout timeout) throws IOException {
+ final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
- sessionListener.timeout(this);
+ sessionListener.timeout(currentSession);
}
- final IOSession currentSession = getSessionImpl();
final IOEventHandler handler = ensureHandler(currentSession);
- handler.timeout(this, timeout);
+ handler.timeout(currentSession, timeout);
}
@Override
void onException(final Exception cause) {
+ final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
- sessionListener.exception(this, cause);
+ sessionListener.exception(currentSession, cause);
}
- final IOSession currentSession = getSessionImpl();
final IOEventHandler handler = currentSession.getHandler();
if (handler != null) {
- handler.exception(this, cause);
+ handler.exception(currentSession, cause);
}
}
void onTLSSessionStart(final SSLIOSession sslSession) {
+ final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
- sessionListener.connected(this);
+ sessionListener.connected(currentSession);
}
}
@@ -177,14 +184,13 @@
}
void disconnected() {
+ final IOSession currentSession = currentSessionRef.get();
if (sessionListener != null) {
- sessionListener.disconnected(this);
+ sessionListener.disconnected(currentSession);
}
- final SSLIOSession tlsSession = tlsSessionRef.get();
- final IOSession currentSession = tlsSession != null ? tlsSession : ioSession;
final IOEventHandler handler = currentSession.getHandler();
if (handler != null) {
- handler.disconnected(this);
+ handler.disconnected(currentSession);
}
}
@@ -196,7 +202,7 @@
final SSLSessionInitializer initializer,
final SSLSessionVerifier verifier,
final Timeout handshakeTimeout) {
- if (tlsSessionRef.compareAndSet(null, new SSLIOSession(
+ final SSLIOSession sslioSession = new SSLIOSession(
endpoint != null ? endpoint : initialEndpoint,
ioSession,
initialEndpoint != null ? SSLMode.CLIENT : SSLMode.SERVER,
@@ -220,9 +226,11 @@
}
},
- handshakeTimeout))) {
+ handshakeTimeout);
+ if (tlsSessionRef.compareAndSet(null, sslioSession)) {
+ currentSessionRef.set(ioSessionDecorator != null ? ioSessionDecorator.decorate(sslioSession) : sslioSession);
if (sessionListener != null) {
- sessionListener.startTls(this);
+ sessionListener.startTls(sslioSession);
}
} else {
throw new IllegalStateException("TLS already activated");
@@ -248,13 +256,14 @@
@Override
public void close(final CloseMode closeMode) {
+ final IOSession currentSession = currentSessionRef.get();
if (closeMode == CloseMode.IMMEDIATE) {
closed.set(true);
- getSessionImpl().close(closeMode);
+ currentSession.close(closeMode);
} else {
if (closed.compareAndSet(false, true)) {
try {
- getSessionImpl().close(closeMode);
+ currentSession.close(closeMode);
} finally {
closedSessions.add(this);
}
@@ -264,32 +273,38 @@
@Override
public IOSession.Status getStatus() {
- return getSessionImpl().getStatus();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.getStatus();
}
@Override
public boolean isOpen() {
- return getSessionImpl().isOpen();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.isOpen();
}
@Override
public void enqueue(final Command command, final Command.Priority priority) {
- getSessionImpl().enqueue(command, priority);
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.enqueue(command, priority);
}
@Override
public boolean hasCommands() {
- return getSessionImpl().hasCommands();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.hasCommands();
}
@Override
public Command poll() {
- return getSessionImpl().poll();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.poll();
}
@Override
public ByteChannel channel() {
- return getSessionImpl().channel();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.channel();
}
@Override
@@ -304,22 +319,26 @@
@Override
public int getEventMask() {
- return getSessionImpl().getEventMask();
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.getEventMask();
}
@Override
public void setEventMask(final int ops) {
- getSessionImpl().setEventMask(ops);
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.setEventMask(ops);
}
@Override
public void setEvent(final int op) {
- getSessionImpl().setEvent(op);
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.setEvent(op);
}
@Override
public void clearEvent(final int op) {
- getSessionImpl().clearEvent(op);
+ final IOSession currentSession = currentSessionRef.get();
+ currentSession.clearEvent(op);
}
@Override
@@ -334,12 +353,14 @@
@Override
public int read(final ByteBuffer dst) throws IOException {
- return getSessionImpl().read(dst);
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.read(dst);
}
@Override
public int write(final ByteBuffer src) throws IOException {
- return getSessionImpl().write(src);
+ final IOSession currentSession = currentSessionRef.get();
+ return currentSession.write(src);
}
@Override
@@ -369,9 +390,9 @@
@Override
public String toString() {
- final SSLIOSession tlsSession = tlsSessionRef.get();
- if (tlsSession != null) {
- return tlsSession.toString();
+ final IOSession currentSession = currentSessionRef.get();
+ if (currentSession != null) {
+ return currentSession.toString();
} else {
return ioSession.toString();
}
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
index ec80a0f..c1a99b4 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/SingleCoreIOReactor.java
@@ -209,8 +209,9 @@
}
final IOSession ioSession = new IOSessionImpl("a", key, socketChannel);
final InternalDataChannel dataChannel = new InternalDataChannel(
- ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
+ ioSession,
null,
+ ioSessionDecorator,
sessionListener,
closedSessions);
dataChannel.upgrade(this.eventHandlerFactory.createHandler(dataChannel, attachment));
@@ -366,8 +367,9 @@
final Object attachment) {
final IOSession ioSession = new IOSessionImpl("c", key, socketChannel);
final InternalDataChannel dataChannel = new InternalDataChannel(
- ioSessionDecorator != null ? ioSessionDecorator.decorate(ioSession) : ioSession,
+ ioSession,
namedEndpoint,
+ ioSessionDecorator,
sessionListener,
closedSessions);
dataChannel.upgrade(eventHandlerFactory.createHandler(dataChannel, attachment));
diff --git a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
index b5d34a9..90d67a6 100644
--- a/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
+++ b/httpcore5/src/main/java/org/apache/hc/core5/reactor/ssl/SSLIOSession.java
@@ -158,29 +158,29 @@
@Override
public void connected(final IOSession protocolSession) throws IOException {
if (handshakeStateRef.compareAndSet(TLSHandShakeState.READY, TLSHandShakeState.INITIALIZED)) {
- initialize();
+ initialize(protocolSession);
}
}
@Override
public void inputReady(final IOSession protocolSession, final ByteBuffer src) throws IOException {
if (handshakeStateRef.compareAndSet(TLSHandShakeState.READY, TLSHandShakeState.INITIALIZED)) {
- initialize();
+ initialize(protocolSession);
}
receiveEncryptedData();
- doHandshake();
- decryptData();
+ doHandshake(protocolSession);
+ decryptData(protocolSession);
updateEventMask();
}
@Override
public void outputReady(final IOSession protocolSession) throws IOException {
if (handshakeStateRef.compareAndSet(TLSHandShakeState.READY, TLSHandShakeState.INITIALIZED)) {
- initialize();
+ initialize(protocolSession);
}
- encryptData();
+ encryptData(protocolSession);
sendEncryptedData();
- doHandshake();
+ doHandshake(protocolSession);
updateEventMask();
}
@@ -190,7 +190,7 @@
// The session failed to terminate cleanly
close(CloseMode.IMMEDIATE);
}
- ensureHandler().timeout(SSLIOSession.this, timeout);
+ ensureHandler().timeout(protocolSession, timeout);
}
@Override
@@ -201,7 +201,7 @@
close(CloseMode.IMMEDIATE);
}
if (handler != null) {
- handler.exception(SSLIOSession.this, cause);
+ handler.exception(protocolSession, cause);
}
}
@@ -209,7 +209,7 @@
public void disconnected(final IOSession protocolSession) {
final IOEventHandler handler = session.getHandler();
if (handler != null) {
- handler.disconnected(SSLIOSession.this);
+ handler.disconnected(protocolSession);
}
}
@@ -228,7 +228,7 @@
return internalEventHandler;
}
- private void initialize() throws IOException {
+ private void initialize(final IOSession protocolSession) throws IOException {
// Save the initial socketTimeout of the underlying IOSession, to be restored after the handshake is finished
this.socketTimeout = this.session.getSocketTimeout();
if (connectTimeout != null) {
@@ -256,7 +256,7 @@
this.inEncrypted.release();
this.outEncrypted.release();
- doHandshake();
+ doHandshake(protocolSession);
} finally {
this.session.getLock().unlock();
}
@@ -302,7 +302,7 @@
}
}
- private void doHandshake() throws IOException {
+ private void doHandshake(final IOSession protocolSession) throws IOException {
boolean handshaking = true;
SSLEngineResult result = null;
@@ -391,7 +391,7 @@
this.tlsDetails = new TlsDetails(sslSession, applicationProtocol);
}
- ensureHandler().connected(this);
+ ensureHandler().connected(protocolSession);
if (this.sessionStartCallback != null) {
this.sessionStartCallback.execute(this);
@@ -530,7 +530,7 @@
return bytesRead;
}
- private void decryptData() throws IOException {
+ private void decryptData(final IOSession protocolSession) throws IOException {
final HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
if ((handshakeStatus == HandshakeStatus.NOT_HANDSHAKING || handshakeStatus == HandshakeStatus.FINISHED)
&& inEncrypted.hasData()) {
@@ -550,7 +550,7 @@
if (inPlainBuf.hasRemaining()) {
inPlainBuf.flip();
try {
- ensureHandler().inputReady(this, inPlainBuf.hasRemaining() ? inPlainBuf : null);
+ ensureHandler().inputReady(protocolSession, inPlainBuf.hasRemaining() ? inPlainBuf : null);
} finally {
inPlainBuf.clear();
}
@@ -575,7 +575,7 @@
}
}
- private void encryptData() throws IOException {
+ private void encryptData(final IOSession protocolSession) throws IOException {
final boolean appReady;
this.session.getLock().lock();
try {
@@ -586,7 +586,7 @@
this.session.getLock().unlock();
}
if (appReady) {
- ensureHandler().outputReady(this);
+ ensureHandler().outputReady(protocolSession);
}
}