blob: 6a1e4080b4649054641d327bb83c4ce6f2d94cb3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.CorruptedFrameException;
import java.util.List;
import org.apache.drill.exec.memory.BufferAllocator;
import com.google.protobuf.CodedInputStream;
import org.apache.drill.exec.exception.OutOfMemoryException;
/**
* Modified version of {@link io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder} that avoids bytebuf copy.
* See the documentation there.
*/
public class ProtobufLengthDecoder extends ByteToMessageDecoder {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ProtobufLengthDecoder.class);
private BufferAllocator allocator;
private OutOfMemoryHandler outOfMemoryHandler;
public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
super();
this.allocator = allocator;
this.outOfMemoryHandler = outOfMemoryHandler;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (!ctx.channel().isOpen()) {
if (in.readableBytes() > 0) {
logger.info("Channel is closed, discarding remaining {} byte(s) in buffer.", in.readableBytes());
}
in.skipBytes(in.readableBytes());
return;
}
in.markReaderIndex();
final byte[] buf = new byte[5];
for (int i = 0; i < buf.length; i++) {
if (!in.isReadable()) {
in.resetReaderIndex();
return;
}
buf[i] = in.readByte();
if (buf[i] >= 0) {
int length = CodedInputStream.newInstance(buf, 0, i + 1).readRawVarint32();
if (length < 0) {
throw new CorruptedFrameException("negative length: " + length);
}
if (length == 0) {
throw new CorruptedFrameException("Received a message of length 0.");
}
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
} else {
// need to make buffer copy, otherwise netty will try to refill this buffer if we move the readerIndex forward...
// TODO: Can we avoid this copy?
ByteBuf outBuf;
try {
outBuf = allocator.buffer(length);
} catch (OutOfMemoryException e) {
logger.warn("Failure allocating buffer on incoming stream due to memory limits. Current Allocation: {}.", allocator.getAllocatedMemory());
in.resetReaderIndex();
outOfMemoryHandler.handle();
return;
}
outBuf.writeBytes(in, in.readerIndex(), length);
in.skipBytes(length);
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug(String.format(
"ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.",
in.readerIndex(), i + 1, length));
}
out.add(outBuf);
return;
}
}
}
// Couldn't find the byte whose MSB is off.
throw new CorruptedFrameException("length wider than 32-bit");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
}
}