| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.protonj2.test.driver; |
| |
| import org.apache.qpid.protonj2.test.driver.codec.Codec; |
| import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| /** |
| * Encodes AMQP performatives into frames for transmission |
| */ |
| public class FrameEncoder { |
| |
| public static final byte AMQP_FRAME_TYPE = (byte) 0; |
| public static final byte SASL_FRAME_TYPE = (byte) 1; |
| |
| private static final int AMQP_PERFORMATIVE_PAD = 512; |
| private static final int FRAME_HEADER_SIZE = 8; |
| |
| private static final int FRAME_START_BYTE = 0; |
| private static final int FRAME_DOFF_BYTE = 4; |
| private static final int FRAME_DOFF_SIZE = 2; |
| private static final int FRAME_TYPE_BYTE = 5; |
| private static final int FRAME_CHANNEL_BYTE = 6; |
| |
| private final AMQPTestDriver driver; |
| |
| private final Codec codec = Codec.Factory.create(); |
| |
| public FrameEncoder(AMQPTestDriver driver) { |
| this.driver = driver; |
| } |
| |
| public ByteBuf handleWrite(DescribedType performative, int channel, ByteBuf payload, Runnable payloadToLarge) { |
| return writeFrame(performative, payload, AMQP_FRAME_TYPE, channel, driver.getOutboundMaxFrameSize(), payloadToLarge); |
| } |
| |
| public ByteBuf handleWrite(DescribedType performative, int channel) { |
| return writeFrame(performative, null, SASL_FRAME_TYPE, (short) 0, driver.getOutboundMaxFrameSize(), null); |
| } |
| |
| private ByteBuf writeFrame(DescribedType performative, ByteBuf payload, byte frameType, int channel, int maxFrameSize, Runnable onPayloadTooLarge) { |
| int outputBufferSize = AMQP_PERFORMATIVE_PAD + (payload != null ? payload.readableBytes() : 0); |
| |
| ByteBuf output = Unpooled.buffer(outputBufferSize); |
| |
| final int performativeSize = writePerformative(performative, payload, maxFrameSize, output, onPayloadTooLarge); |
| final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE; |
| final int payloadSize = Math.min(payload == null ? 0 : payload.readableBytes(), capacity); |
| |
| if (payloadSize > 0) { |
| output.writeBytes(payload, payloadSize); |
| } |
| |
| endFrame(output, frameType, channel); |
| |
| return output; |
| } |
| |
| private int writePerformative(DescribedType performative, ByteBuf payload, int maxFrameSize, ByteBuf output, Runnable onPayloadTooLarge) { |
| output.writerIndex(FRAME_HEADER_SIZE); |
| |
| long encodedSize = 0; |
| int startIndex = output.writerIndex(); |
| |
| if (performative != null) { |
| try { |
| codec.putDescribedType(performative); |
| encodedSize = codec.encode(output); |
| } finally { |
| codec.clear(); |
| } |
| } |
| |
| int performativeSize = output.writerIndex() - startIndex; |
| |
| if (performativeSize != encodedSize) { |
| throw new IllegalStateException(String.format( |
| "Unable to encode performative %s of %d bytes into provided proton buffer, only wrote %d bytes", |
| performative, performativeSize, encodedSize)); |
| } |
| |
| if (onPayloadTooLarge != null && maxFrameSize > 0 && payload != null && (payload.readableBytes() + performativeSize) > maxFrameSize) { |
| // Next iteration will re-encode the frame body again with updates from the <payload-to-large> |
| // handler and then we can move onto the body portion. |
| onPayloadTooLarge.run(); |
| performativeSize = writePerformative(performative, payload, maxFrameSize, output, null); |
| } |
| |
| return performativeSize; |
| } |
| |
| private static void endFrame(ByteBuf output, byte frameType, int channel) { |
| output.setInt(FRAME_START_BYTE, output.readableBytes()); |
| output.setByte(FRAME_DOFF_BYTE, FRAME_DOFF_SIZE); |
| output.setByte(FRAME_TYPE_BYTE, frameType); |
| output.setShort(FRAME_CHANNEL_BYTE, (short) channel); |
| } |
| } |