PROTON-2571 Flush netty only on read completion

During a read operation that might cause more than one channel
write we can save the overhead of multople forced flush operations
by hooking the channel read complete event point to initiate a
final flush operation for all write inside the read event.
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
index a1546eb..09f5955 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientConnection.java
@@ -615,8 +615,10 @@
         });
     }
 
-    void autoFlushOff() {
+    boolean autoFlushOff() {
+        boolean oldState = autoFlush;
         autoFlush = false;
+        return oldState;
     }
 
     void autoFlushOn() {
@@ -782,6 +784,10 @@
         } catch (Exception ignore) {}
 
         try {
+            transport.flush();
+        } catch (Exception ignore) {}
+
+        try {
             transport.close();
         } catch (Exception ignore) {}
 
@@ -976,7 +982,7 @@
             reconnectAttempts++;
             transport = ioContext.newTransport();
             LOG.trace("Connection {} Attempting connection to remote {}:{}", getId(), location.getHost(), location.getPort());
-            transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(engine));
+            transport.connect(location.getHost(), location.getPort(), new ClientTransportListener(this, engine));
         } catch (Throwable error) {
             engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(error));
         }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
index 7ff4849..3d73c99 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientSender.java
@@ -337,14 +337,11 @@
             if (delivery == null) {
                 delivery = sender.protonLink().next();
                 delivery.setLinkedResource(sender.createTracker(delivery));
-            }
-
-            if (delivery.getTransferCount() == 0) {
                 delivery.setMessageFormat(messageFormat);
                 delivery.disposition(state, settled);
             }
 
-            sender.connection().autoFlushOff();
+            boolean wasAutoFlushOn = sender.connection().autoFlushOff();
             try {
                 delivery.streamBytes(payload, true);
                 if (payload != null && payload.isReadable()) {
@@ -352,9 +349,11 @@
                 } else {
                     succeeded();
                 }
-                sender.connection().flush();
             } finally {
-                sender.connection().autoFlushOn();
+                if (wasAutoFlushOn) {
+                    sender.connection().flush();
+                    sender.connection().autoFlushOn();
+                }
             }
         }
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
index 511f902..2e0ef82 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientStreamSender.java
@@ -518,7 +518,7 @@
                 delivery.abort();
                 succeeded();
             } else {
-                sender.connection().autoFlushOff();
+                boolean wasAutoFlushOn = sender.connection().autoFlushOff();
                 try {
                     delivery.streamBytes(payload, complete);
                     if (payload != null && payload.isReadable()) {
@@ -526,9 +526,11 @@
                     } else {
                         succeeded();
                     }
-                    sender.connection().flush();
                 } finally {
-                    sender.connection().autoFlushOn();
+                    if (wasAutoFlushOn) {
+                        sender.connection().flush();
+                        sender.connection().autoFlushOn();
+                    }
                 }
             }
         }
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
index 84049df..0345e5a 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/impl/ClientTransportListener.java
@@ -34,9 +34,11 @@
     private static final Logger LOG = LoggerFactory.getLogger(ClientTransportListener.class);
 
     private final Engine engine;
+    private final ClientConnection connection;
 
-    ClientTransportListener(Engine engine) {
+    ClientTransportListener(ClientConnection connection, Engine engine) {
         this.engine = engine;
+        this.connection = connection;
     }
 
     @Override
@@ -52,6 +54,7 @@
     @Override
     public void transportRead(ProtonBuffer incoming) {
         try {
+            connection.autoFlushOff();
             do {
                 engine.ingest(incoming);
             } while (incoming.isReadable() && engine.isWritable());
@@ -59,6 +62,8 @@
         } catch (EngineStateException e) {
             LOG.warn("Caught problem during incoming data processing: {}", e.getMessage(), e);
             engine.engineFailed(ClientExceptionSupport.createOrPassthroughFatal(e));
+        } finally {
+            connection.autoFlushOn();
         }
     }
 
diff --git a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
index a179075..119467c 100644
--- a/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
+++ b/protonj2-client/src/main/java/org/apache/qpid/protonj2/client/transport/TcpTransport.java
@@ -451,6 +451,11 @@
         }
 
         @Override
+        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
+            ctx.flush();
+        }
+
+        @Override
         public void channelInactive(ChannelHandlerContext context) throws Exception {
             handleTransportFailure(context.channel(), new IOException("Remote closed connection unexpectedly"));
         }