blob: b99270b1c1bb6e779869af8b4b03793854b72535 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.common.api;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
/**
* ByteBuf holder that contains 2 buffers.
*/
public final class ByteBufPair extends AbstractReferenceCounted {
private ByteBuf b1;
private ByteBuf b2;
private final Handle<ByteBufPair> recyclerHandle;
private static final Recycler<ByteBufPair> RECYCLER = new Recycler<ByteBufPair>() {
@Override
protected ByteBufPair newObject(Recycler.Handle<ByteBufPair> handle) {
return new ByteBufPair(handle);
}
};
private ByteBufPair(Handle<ByteBufPair> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
/**
* Get a new {@link ByteBufPair} from the pool and assign 2 buffers to it.
* <p>
* The buffers b1 and b2 lifecycles are now managed by the ByteBufPair: when the {@link ByteBufPair} is deallocated,
* b1 and b2 will be released as well.
*
* @param b1
* @param b2
* @return
*/
public static ByteBufPair get(ByteBuf b1, ByteBuf b2) {
ByteBufPair buf = RECYCLER.get();
buf.setRefCnt(1);
buf.b1 = b1;
buf.b2 = b2;
return buf;
}
public ByteBuf getFirst() {
return b1;
}
public ByteBuf getSecond() {
return b2;
}
public int readableBytes() {
return b1.readableBytes() + b2.readableBytes();
}
/**
* @return a single buffer with the content of both individual buffers
*/
@VisibleForTesting
public static ByteBuf coalesce(ByteBufPair pair) {
ByteBuf b = Unpooled.buffer(pair.readableBytes());
b.writeBytes(pair.b1, pair.b1.readerIndex(), pair.b1.readableBytes());
b.writeBytes(pair.b2, pair.b2.readerIndex(), pair.b2.readableBytes());
return b;
}
@Override
protected void deallocate() {
b1.release();
b2.release();
b1 = b2 = null;
recyclerHandle.recycle(this);
}
@Override
public ReferenceCounted touch(Object hint) {
b1.touch(hint);
b2.touch(hint);
return this;
}
public static final Encoder ENCODER = new Encoder();
public static final CopyingEncoder COPYING_ENCODER = new CopyingEncoder();
@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;
// Write each buffer individually on the socket. The retain() here is needed to preserve the fact that
// ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased and it
// gets written multiple times, the individual buffers refcount should be reflected as well.
try {
ctx.write(b.getFirst().retainedDuplicate(), ctx.voidPromise());
ctx.write(b.getSecond().retainedDuplicate(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
} else {
ctx.write(msg, promise);
}
}
}
@Sharable
public static class CopyingEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufPair) {
ByteBufPair b = (ByteBufPair) msg;
// Some handlers in the pipeline will modify the bytebufs passed in to them (i.e. SslHandler).
// For these handlers, we need to pass a copy of the buffers as the source buffers may be cached
// for multiple requests.
try {
ctx.write(b.getFirst().copy(), ctx.voidPromise());
ctx.write(b.getSecond().copy(), promise);
} finally {
ReferenceCountUtil.safeRelease(b);
}
} else {
ctx.write(msg, promise);
}
}
}
}