Copy ByteBufPair buffers when using with SSL (#2401)
The netty SSL handler uses a coalescing buffer queue, which modifies
the buffers used to queue the writes so that SSL_write can be given
larger chunks, thereby increasing the 'goodput'.
If we pass in a retained duplicate as we have been doing until now,
then later clients will be passed junk, as SSL will have modified cached
entry buffers.
This patch introduces a copying ByteBufPair encoder, which is only
used with SSL connections.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
index da10a2d..88a7357 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarChannelInitializer.java
@@ -52,9 +52,11 @@
if (enableTLS) {
SslContext sslCtx = SecurityUtility.createNettySslContextForServer(serviceConfig.isTlsAllowInsecureConnection(), serviceConfig.getTlsTrustCertsFilePath(), serviceConfig.getTlsCertificateFilePath(), serviceConfig.getTlsKeyFilePath());
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+ ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+ } else {
+ ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ServerCnx(brokerService));
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index c36fcd9..a1c9075 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -95,9 +95,10 @@
conf.getTlsTrustCertsFilePath());
}
ch.pipeline().addLast(TLS_HANDLER, sslCtx.newHandler(ch.alloc()));
+ ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.COPYING_ENCODER);
+ } else {
+ ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
}
-
- ch.pipeline().addLast("ByteBufPairEncoder", ByteBufPair.ENCODER);
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(MaxMessageSize, 0, 4, 0, 4));
ch.pipeline().addLast("handler", new ClientCnx(conf, eventLoopGroup));
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index 94e1fb7..b99270b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -109,6 +109,7 @@
}
public static final Encoder ENCODER = new Encoder();
+ public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
@@ -132,4 +133,26 @@
}
}
-}
\ No newline at end of file
+ @Sharable
+ public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (msg instanceof ByteBufPair) {
+ ByteBufPair b = (ByteBufPair) msg;
+
+ // Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
+ // For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
+ // for multiple requests.
+ try {
+ ctx.write(b.getFirst().copy(), ctx.voidPromise());
+ ctx.write(b.getSecond().copy(), promise);
+ } finally {
+ ReferenceCountUtil.safeRelease(b);
+ }
+ } else {
+ ctx.write(msg, promise);
+ }
+ }
+ }
+
+}