blob: b9fa20e0a5f200e20d3dabb69c7d39cb3615e530 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.test.driver;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import org.apache.qpid.protonj2.test.driver.codec.Codec;
import org.apache.qpid.protonj2.test.driver.codec.primitives.DescribedType;
/**
* Encodes AMQP performatives into frames for transmission
*/
public class FrameEncoder {
public static final byte AMQP_FRAME_TYPE = (byte) 0;
public static final byte SASL_FRAME_TYPE = (byte) 1;
private static final int AMQP_PERFORMATIVE_PAD = 512;
private static final int FRAME_START_BYTE = 0;
private static final int FRAME_DOFF_BYTE = 4;
private static final int FRAME_DOFF_SIZE = 2;
private static final int FRAME_TYPE_BYTE = 5;
private static final int FRAME_CHANNEL_BYTE = 6;
private static final int FRAME_HEADER_SIZE = 8;
private static final byte[] FRAME_HEADER_RESERVED = new byte[FRAME_HEADER_SIZE];
private final AMQPTestDriver driver;
private final Codec codec = Codec.Factory.create();
public FrameEncoder(AMQPTestDriver driver) {
this.driver = driver;
}
public ByteBuffer handleWrite(DescribedType performative, int channel, ByteBuffer payload, Runnable payloadToLarge) {
return writeFrame(performative, payload, AMQP_FRAME_TYPE, channel, driver.getOutboundMaxFrameSize(), payloadToLarge);
}
public ByteBuffer handleWrite(DescribedType performative, int channel) {
return writeFrame(performative, null, SASL_FRAME_TYPE, (short) 0, driver.getOutboundMaxFrameSize(), null);
}
private ByteBuffer writeFrame(DescribedType performative, ByteBuffer payload, byte frameType, int channel, int maxFrameSize, Runnable onPayloadTooLarge) {
final int outputBufferSize = AMQP_PERFORMATIVE_PAD + (payload != null ? payload.remaining() : 0);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream(outputBufferSize)) {
final int performativeSize = writePerformative(performative, payload, maxFrameSize, baos, onPayloadTooLarge);
final int capacity = maxFrameSize > 0 ? maxFrameSize - performativeSize : Integer.MAX_VALUE;
final int payloadSize = Math.min(payload == null ? 0 : payload.remaining(), capacity);
if (payloadSize > 0) {
byte[] payloadArray = new byte[payloadSize];
payload.get(payloadArray);
baos.write(payloadArray);
}
final ByteBuffer output = ByteBuffer.wrap(baos.toByteArray());
endFrame(output, frameType, channel);
return output.asReadOnlyBuffer();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private int writePerformative(DescribedType performative, ByteBuffer payload, int maxFrameSize, ByteArrayOutputStream output, Runnable onPayloadTooLarge) {
try {
output.write(FRAME_HEADER_RESERVED); // Reserve space for later Frame preamble
long encodedSize = 0;
if (performative != null) {
try {
codec.putDescribedType(performative);
encodedSize = codec.encode(output);
} finally {
codec.clear();
}
}
int performativeSize = output.size() - FRAME_HEADER_SIZE;
if (performativeSize != encodedSize) {
throw new IllegalStateException(String.format(
"Unable to encode performative %s of %d bytes into provided proton buffer, only wrote %d bytes",
performative, performativeSize, encodedSize));
}
if (onPayloadTooLarge != null && maxFrameSize > 0 && payload != null && (payload.remaining() + performativeSize) > maxFrameSize) {
// Next iteration will re-encode the frame body again with updates from the <payload-to-large>
// handler and then we can move onto the body portion.
onPayloadTooLarge.run();
output.reset();
performativeSize = writePerformative(performative, payload, maxFrameSize, output, null);
}
return performativeSize;
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
private static void endFrame(ByteBuffer output, byte frameType, int channel) {
output.putInt(FRAME_START_BYTE, output.remaining());
output.put(FRAME_DOFF_BYTE, (byte) FRAME_DOFF_SIZE);
output.put(FRAME_TYPE_BYTE, frameType);
output.putShort(FRAME_CHANNEL_BYTE, (short) channel);
}
}