blob: 6bfa4a311c3a799a9d9c69caaec239742e34168a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.rpc;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageEncoder;
import java.util.List;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
/**
* Handler that converts an input ByteBuf into chunk size ByteBuf's and add it to the
* CompositeByteBuf as individual components. If encryption is enabled, this is always
* added in the channel pipeline.
*/
class ChunkCreationHandler extends MessageToMessageEncoder<ByteBuf> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(
ChunkCreationHandler.class.getCanonicalName());
private final int chunkSize;
ChunkCreationHandler(int chunkSize) {
checkArgument(chunkSize > 0);
this.chunkSize = chunkSize;
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
logger.trace("Added " + RpcConstants.CHUNK_CREATION_HANDLER + " handler!");
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
super.handlerRemoved(ctx);
logger.trace("Removed " + RpcConstants.CHUNK_CREATION_HANDLER + " handler");
}
@Override
protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
if (RpcConstants.EXTRA_DEBUGGING) {
logger.debug("ChunkCreationHandler called with msg {} of size {} with chunkSize {}",
msg, msg.readableBytes(), chunkSize);
}
if (!ctx.channel().isOpen()) {
logger.debug("Channel closed, skipping encode inside {}.", RpcConstants.CHUNK_CREATION_HANDLER);
msg.release();
return;
}
// Calculate the number of chunks based on configured chunk size and input msg size
int numChunks = (int) Math.ceil((double) msg.readableBytes() / chunkSize);
// Initialize a composite buffer to hold numChunks chunk.
final CompositeByteBuf cbb = ctx.alloc().compositeBuffer(numChunks);
int cbbWriteIndex = 0;
int currentChunkLen = min(msg.readableBytes(), chunkSize);
// Create slices of chunkSize from input msg and add it to the composite buffer.
while (numChunks > 0) {
final ByteBuf chunkBuf = msg.slice(msg.readerIndex(), currentChunkLen);
chunkBuf.retain();
cbb.addComponent(chunkBuf);
cbbWriteIndex += currentChunkLen;
msg.skipBytes(currentChunkLen);
--numChunks;
currentChunkLen = min(msg.readableBytes(), chunkSize);
}
// Update the writerIndex of composite byte buffer. Netty doesn't do it automatically.
cbb.writerIndex(cbbWriteIndex);
// Add the final composite bytebuf into output buffer.
out.add(cbb);
}
}