blob: deab9fe4ca0b65feb95def549792d064c3680d18 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.uniffle.common.netty;
import java.util.LinkedList;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.EmptyByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.uniffle.common.netty.protocol.Message;
/**
* A customized frame decoder that allows intercepting raw data.
*
* <p>This behaves like Netty's frame decoder (with hard coded parameters that match this library's
* needs), except it allows an interceptor to be installed to read data directly before it's framed.
*
* <p>Unlike Netty's frame decoder, each frame is dispatched to child handlers as soon as it's
* decoded, instead of building as many frames as the current buffer allows and dispatching all of
* them. This allows a child handler to install an interceptor if needed.
*
* <p>If an interceptor is installed, framing stops, and data is instead fed directly to the
* interceptor. When the interceptor indicates that it doesn't need to read any more data, framing
* resumes. Interceptors should not hold references to the data buffers provided to their handle()
* method.
*/
public class TransportFrameDecoder extends ChannelInboundHandlerAdapter implements FrameDecoder {
private int msgSize = -1;
private int bodySize = -1;
private Message.Type curType = Message.Type.UNKNOWN_TYPE;
private ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE);
private static final int MAX_FRAME_SIZE = Integer.MAX_VALUE;
private static final int UNKNOWN_FRAME_SIZE = -1;
private final LinkedList<ByteBuf> buffers = new LinkedList<>();
private long totalSize = 0;
private long nextFrameSize = UNKNOWN_FRAME_SIZE;
@Override
public void channelRead(ChannelHandlerContext ctx, Object data) {
ByteBuf in = (ByteBuf) data;
buffers.add(in);
totalSize += in.readableBytes();
while (!buffers.isEmpty()) {
ByteBuf frame = decodeNext();
if (frame == null) {
break;
}
Message msg = null;
try {
msg = Message.decode(curType, frame);
} finally {
if (shouldRelease(msg)) {
frame.release();
}
}
ctx.fireChannelRead(msg);
clear();
}
}
static boolean shouldRelease(Message msg) {
if (msg == null || msg.body() == null || msg.body().byteBuf() == null) {
return true;
}
return msg.body().byteBuf() instanceof EmptyByteBuf;
}
private void clear() {
curType = Message.Type.UNKNOWN_TYPE;
msgSize = -1;
bodySize = -1;
headerBuf.clear();
}
private long decodeFrameSize() {
if (nextFrameSize != UNKNOWN_FRAME_SIZE || totalSize < HEADER_SIZE) {
return nextFrameSize;
}
// We know there's enough data. If the first buffer contains all the data, great. Otherwise,
// hold the bytes for the frame length in a composite buffer until we have enough data to read
// the frame size. Normally, it should be rare to need more than one buffer to read the frame
// size.
ByteBuf first = buffers.getFirst();
if (first.readableBytes() >= HEADER_SIZE) {
msgSize = first.readInt();
curType = Message.Type.decode(first);
bodySize = first.readInt();
nextFrameSize = msgSize + bodySize;
totalSize -= HEADER_SIZE;
if (!first.isReadable()) {
buffers.removeFirst().release();
}
return nextFrameSize;
}
while (headerBuf.readableBytes() < HEADER_SIZE) {
ByteBuf next = buffers.getFirst();
int toRead = Math.min(next.readableBytes(), HEADER_SIZE - headerBuf.readableBytes());
headerBuf.writeBytes(next, toRead);
if (!next.isReadable()) {
buffers.removeFirst().release();
}
}
msgSize = headerBuf.readInt();
curType = Message.Type.decode(headerBuf);
bodySize = headerBuf.readInt();
nextFrameSize = msgSize + bodySize;
totalSize -= HEADER_SIZE;
return nextFrameSize;
}
private ByteBuf decodeNext() {
long frameSize = decodeFrameSize();
if (frameSize == UNKNOWN_FRAME_SIZE || totalSize < frameSize) {
return null;
}
// Reset size for next frame.
nextFrameSize = UNKNOWN_FRAME_SIZE;
Preconditions.checkArgument(frameSize < MAX_FRAME_SIZE, "Too large frame: %s", frameSize);
Preconditions.checkArgument(frameSize > 0, "Frame length should be positive: %s", frameSize);
// If the first buffer holds the entire frame, return it.
int remaining = (int) frameSize;
if (buffers.getFirst().readableBytes() >= remaining) {
return nextBufferForFrame(remaining);
}
// Otherwise, create a composite buffer.
CompositeByteBuf frame = buffers.getFirst().alloc().compositeBuffer(Integer.MAX_VALUE);
while (remaining > 0) {
ByteBuf next = nextBufferForFrame(remaining);
remaining -= next.readableBytes();
frame.addComponent(next).writerIndex(frame.writerIndex() + next.readableBytes());
}
assert remaining == 0;
return frame;
}
/**
* Takes the first buffer in the internal list, and either adjust it to fit in the frame (by
* taking a slice out of it) or remove it from the internal list.
*/
private ByteBuf nextBufferForFrame(int bytesToRead) {
ByteBuf buf = buffers.getFirst();
ByteBuf frame;
if (buf.readableBytes() > bytesToRead) {
frame = buf.retain().readSlice(bytesToRead);
totalSize -= bytesToRead;
} else {
frame = buf;
buffers.removeFirst();
totalSize -= frame.readableBytes();
}
return frame;
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// Release all buffers that are still in our ownership.
// Doing this in handlerRemoved(...) guarantees that this will happen in all cases:
// - When the Channel becomes inactive
// - When the decoder is removed from the ChannelPipeline
for (ByteBuf b : buffers) {
b.release();
}
buffers.clear();
headerBuf.release();
super.handlerRemoved(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}