Copy ByteBufPair buffers when using with SSL (#2401)

The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.

If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.

This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index da10a2d..88a7357 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -52,9 +52,11 @@
         if (enableTLS) {
             SslContext sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
             ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+        } else {
+            ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         }
 
-        ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
         ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
         ch.pipeline().addLast("handler", new ServerCnx(brokerService));
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index c36fcd9..a1c9075 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -95,9 +95,10 @@
                                 conf.getTlsTrustCertsFilePath());
                     }
                     ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+                    ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+                } else {
+                    ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
                 }
-
-                ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
                 ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
                 ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
             }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index 94e1fb7..b99270b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -109,6 +109,7 @@
     }
 
     public static final Encoder ENCODER = new Encoder();
+    public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
 
     @Sharable
     public static class Encoder extends ChannelOutboundHandlerAdapter {
@@ -132,4 +133,26 @@
         }
     }
 
-}
\ No newline at end of file
+    @Sharable
+    public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
+        @Override
+        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+            if (msg instanceof ByteBufPair) {
+                ByteBufPair b = (ByteBufPair) msg;
+
+                // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
+                // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
+                // for multiple requests.
+                try {
+                    ctx.write(b.getFirst().copy(), ctx.voidPromise());
+                    ctx.write(b.getSecond().copy(), promise);
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
+            } else {
+                ctx.write(msg, promise);
+            }
+        }
+    }
+
+}