| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.avro.ipc.netty; |
| |
| import java.nio.Buffer; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| import org.apache.avro.AvroRuntimeException; |
| |
| import org.jboss.netty.buffer.ChannelBuffer; |
| import org.jboss.netty.buffer.ChannelBuffers; |
| import org.jboss.netty.channel.Channel; |
| import org.jboss.netty.channel.ChannelHandlerContext; |
| import org.jboss.netty.handler.codec.frame.FrameDecoder; |
| import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; |
| |
| /** |
| * Data structure, encoder and decoder classes for the Netty transport. |
| */ |
| public class NettyTransportCodec { |
| /** |
| * Transport protocol data structure when using Netty. |
| */ |
| public static class NettyDataPack { |
| private int serial; // to track each call in client side |
| private List<ByteBuffer> datas; |
| |
| public NettyDataPack() { |
| } |
| |
| public NettyDataPack(int serial, List<ByteBuffer> datas) { |
| this.serial = serial; |
| this.datas = datas; |
| } |
| |
| public void setSerial(int serial) { |
| this.serial = serial; |
| } |
| |
| public int getSerial() { |
| return serial; |
| } |
| |
| public void setDatas(List<ByteBuffer> datas) { |
| this.datas = datas; |
| } |
| |
| public List<ByteBuffer> getDatas() { |
| return datas; |
| } |
| |
| } |
| |
| /** |
| * Protocol encoder which converts NettyDataPack which contains the Responder's |
| * output List<ByteBuffer> to ChannelBuffer needed by Netty. |
| */ |
| public static class NettyFrameEncoder extends OneToOneEncoder { |
| |
| /** |
| * encode msg to ChannelBuffer |
| * |
| * @param msg NettyDataPack from NettyServerAvroHandler/NettyClientAvroHandler |
| * in the pipeline |
| * @return encoded ChannelBuffer |
| */ |
| @Override |
| protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { |
| NettyDataPack dataPack = (NettyDataPack) msg; |
| List<ByteBuffer> origs = dataPack.getDatas(); |
| List<ByteBuffer> bbs = new ArrayList<>(origs.size() * 2 + 1); |
| bbs.add(getPackHeader(dataPack)); // prepend a pack header including serial number and list size |
| for (ByteBuffer b : origs) { |
| bbs.add(getLengthHeader(b)); // for each buffer prepend length field |
| bbs.add(b); |
| } |
| |
| return ChannelBuffers.wrappedBuffer(bbs.toArray(new ByteBuffer[0])); |
| } |
| |
| private ByteBuffer getPackHeader(NettyDataPack dataPack) { |
| ByteBuffer header = ByteBuffer.allocate(8); |
| header.putInt(dataPack.getSerial()); |
| header.putInt(dataPack.getDatas().size()); |
| ((Buffer) header).flip(); |
| return header; |
| } |
| |
| private ByteBuffer getLengthHeader(ByteBuffer buf) { |
| ByteBuffer header = ByteBuffer.allocate(4); |
| header.putInt(buf.limit()); |
| ((Buffer) header).flip(); |
| return header; |
| } |
| } |
| |
| /** |
| * Protocol decoder which converts Netty's ChannelBuffer to NettyDataPack which |
| * contains a List<ByteBuffer> needed by Avro Responder. |
| */ |
| public static class NettyFrameDecoder extends FrameDecoder { |
| private boolean packHeaderRead = false; |
| private int listSize; |
| private NettyDataPack dataPack; |
| private final long maxMem; |
| private static final long SIZEOF_REF = 8L; // mem usage of 64-bit pointer |
| |
| public NettyFrameDecoder() { |
| maxMem = Runtime.getRuntime().maxMemory(); |
| } |
| |
| /** |
| * decode buffer to NettyDataPack |
| */ |
| @Override |
| protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { |
| |
| if (!packHeaderRead) { |
| if (decodePackHeader(ctx, channel, buffer)) { |
| packHeaderRead = true; |
| } |
| return null; |
| } else { |
| if (decodePackBody(ctx, channel, buffer)) { |
| packHeaderRead = false; // reset state |
| return dataPack; |
| } else { |
| return null; |
| } |
| } |
| |
| } |
| |
| private boolean decodePackHeader(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) |
| throws Exception { |
| if (buffer.readableBytes() < 8) { |
| return false; |
| } |
| |
| int serial = buffer.readInt(); |
| int listSize = buffer.readInt(); |
| |
| // Sanity check to reduce likelihood of invalid requests being honored. |
| // Only allow 10% of available memory to go towards this list (too much!) |
| if (listSize * SIZEOF_REF > 0.1 * maxMem) { |
| channel.close().await(); |
| throw new AvroRuntimeException( |
| "Excessively large list allocation " + "request detected: " + listSize + " items! Connection closed."); |
| } |
| |
| this.listSize = listSize; |
| dataPack = new NettyDataPack(serial, new ArrayList<>(listSize)); |
| |
| return true; |
| } |
| |
| private boolean decodePackBody(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { |
| if (buffer.readableBytes() < 4) { |
| return false; |
| } |
| |
| buffer.markReaderIndex(); |
| |
| int length = buffer.readInt(); |
| |
| if (buffer.readableBytes() < length) { |
| buffer.resetReaderIndex(); |
| return false; |
| } |
| |
| ByteBuffer bb = ByteBuffer.allocate(length); |
| buffer.readBytes(bb); |
| ((Buffer) bb).flip(); |
| dataPack.getDatas().add(bb); |
| |
| return dataPack.getDatas().size() == listSize; |
| } |
| |
| } |
| |
| } |