PROTON-2571 Flush netty only on read completion
During a read operation that might cause more than one channel
write we can save the overhead of multople forced flush operations
by hooking the channel read complete event point to initiate a
final flush operation for all write inside the read event.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1546eb..09f5955 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -615,8 +615,10 @@
});
}
- void autoFlushOff() {
+ boolean autoFlushOff() {
+ boolean oldState = autoFlush;
autoFlush = false;
+ return oldState;
}
void autoFlushOn() {
@@ -782,6 +784,10 @@
} catch (Exception ignore) {}
try {
+ transport.flush();
+ } catch (Exception ignore) {}
+
+ try {
transport.close();
} catch (Exception ignore) {}
@@ -976,7 +982,7 @@
reconnectAttempts++;
transport = ioContext.newTransport();
LOG.trace("Connection {} Attempting connection to remote {}:{}", getId(), location.getHost(), location.getPort());
- transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(engine));
+ transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(this, engine));
} catch (Throwable error) {
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(error));
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 7ff4849..3d73c99 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -337,14 +337,11 @@
if (delivery == null) {
delivery = sender.protonLink().next();
delivery.setLinkedResource(sender.createTracker(delivery));
- }
-
- if (delivery.getTransferCount() == 0) {
delivery.setMessageFormat(messageFormat);
delivery.disposition(state, settled);
}
- sender.connection().autoFlushOff();
+ boolean wasAutoFlushOn = sender.connection().autoFlushOff();
try {
delivery.streamBytes(payload, true);
if (payload != null && payload.isReadable()) {
@@ -352,9 +349,11 @@
} else {
succeeded();
}
- sender.connection().flush();
} finally {
- sender.connection().autoFlushOn();
+ if (wasAutoFlushOn) {
+ sender.connection().flush();
+ sender.connection().autoFlushOn();
+ }
}
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 511f902..2e0ef82 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -518,7 +518,7 @@
delivery.abort();
succeeded();
} else {
- sender.connection().autoFlushOff();
+ boolean wasAutoFlushOn = sender.connection().autoFlushOff();
try {
delivery.streamBytes(payload, complete);
if (payload != null && payload.isReadable()) {
@@ -526,9 +526,11 @@
} else {
succeeded();
}
- sender.connection().flush();
} finally {
- sender.connection().autoFlushOn();
+ if (wasAutoFlushOn) {
+ sender.connection().flush();
+ sender.connection().autoFlushOn();
+ }
}
}
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
index 84049df..0345e5a 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
@@ -34,9 +34,11 @@
private static final Logger LOG = LoggerFactory.getLogger(ClientTransportListener.class);
private final Engine engine;
+ private final ClientConnection connection;
- ClientTransportListener(Engine engine) {
+ ClientTransportListener(ClientConnection connection, Engine engine) {
this.engine = engine;
+ this.connection = connection;
}
@Override
@@ -52,6 +54,7 @@
@Override
public void transportRead(ProtonBuffer incoming) {
try {
+ connection.autoFlushOff();
do {
engine.ingest(incoming);
} while (incoming.isReadable() && engine.isWritable());
@@ -59,6 +62,8 @@
} catch (EngineStateException e) {
LOG.warn("Caught problem during incoming data processing: {}", e.getMessage(), e);
engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(e));
+ } finally {
+ connection.autoFlushOn();
}
}
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
index a179075..119467c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
@@ -451,6 +451,11 @@
}
@Override
+ public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+ ctx.flush();
+ }
+
+ @Override
public void channelInactive(ChannelHandlerContext context) throws Exception {
handleTransportFailure(context.channel(), new IOException("Remote closed connection unexpectedly"));
}