blob: e94a170f187afc81b399984d074109c6f1eda6a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.test.driver;
import org.apache.qpid.protonj2.test.driver.codec.Codec;
import org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType;
import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader;
import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat;
import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
class FrameDecoder {
private static final Logger LOG = LoggerFactory.getLogger(AMQPTestDriver.class);
public static final byte AMQP_FRAME_TYPE = (byte) 0;
public static final byte SASL_FRAME_TYPE = (byte) 1;
public static final int FRAME_SIZE_BYTES = 4;
private final AMQPTestDriver driver;
private final Codec codec = Codec.Factory.create();
private FrameParserStage stage = new HeaderParsingStage();
// Parser stages used during the parsing process
private final FrameSizeParsingStage frameSizeParser = new FrameSizeParsingStage();
private final FrameBufferingStage frameBufferingStage = new FrameBufferingStage();
private final FrameParserStage frameBodyParsingStage = new FrameBodyParsingStage();
public FrameDecoder(AMQPTestDriver driver) {
this.driver = driver;
}
public void ingest(ByteBuf buffer) throws AssertionError {
try {
// Parses in-incoming data and emit one complete frame before returning, caller should
// ensure that the input buffer is drained into the engine or stop if the engine
// has changed to a non-writable state.
stage.parse(buffer);
} catch (AssertionError ex) {
transitionToErrorStage(ex);
throw ex;
} catch (Throwable throwable) {
AssertionError error = new AssertionError("Frame decode failed.", throwable);
transitionToErrorStage(error);
throw error;
}
}
/**
* Resets the parser back to the expect a header state.
*/
public void resetToExpectingHeader() {
this.stage = new HeaderParsingStage();
}
//---- Methods to transition between stages
private FrameParserStage transitionToFrameSizeParsingStage() {
return stage = frameSizeParser.reset(0);
}
private FrameParserStage transitionToFrameBufferingStage(int frameSize) {
return stage = frameBufferingStage.reset(frameSize);
}
private FrameParserStage initializeFrameBodyParsingStage(int frameSize) {
return stage = frameBodyParsingStage.reset(frameSize);
}
private ParsingErrorStage transitionToErrorStage(AssertionError error) {
if (!(stage instanceof ParsingErrorStage)) {
stage = new ParsingErrorStage(error);
}
return (ParsingErrorStage) stage;
}
//----- Frame Parsing Stage definition
private interface FrameParserStage {
/**
* Parse the incoming data and provide events to the parent Transport
* based on the contents of that data.
*
* @param input
* The ByteBuf containing new data to be parsed.
*
* @throws AssertionError if an error occurs while parsing incoming data.
*/
void parse(ByteBuf input) throws AssertionError;
/**
* Reset the stage to its defaults for a new cycle of parsing.
*
* @param frameSize
* The frameSize to use for this part of the parsing operation
*
* @return a reference to this parsing stage for chaining.
*/
FrameParserStage reset(int frameSize);
}
//---- Built in FrameParserStages
private class HeaderParsingStage implements FrameParserStage {
private final byte[] headerBytes = new byte[AMQPHeader.HEADER_SIZE_BYTES];
private int headerByte;
@Override
public void parse(ByteBuf incoming) throws AssertionError {
while (incoming.isReadable() && headerByte < AMQPHeader.HEADER_SIZE_BYTES) {
headerBytes[headerByte++] = incoming.readByte();
}
if (headerByte == AMQPHeader.HEADER_SIZE_BYTES) {
// Construct a new Header from the read bytes which will validate the contents
AMQPHeader header = new AMQPHeader(headerBytes);
// Transition to parsing the frames if any pipelined into this buffer.
transitionToFrameSizeParsingStage();
if (header.isSaslHeader()) {
driver.handleHeader(AMQPHeader.getSASLHeader());
} else {
driver.handleHeader(AMQPHeader.getAMQPHeader());
}
}
}
@Override
public HeaderParsingStage reset(int frameSize) {
headerByte = 0;
return this;
}
}
private class FrameSizeParsingStage implements FrameParserStage {
private int frameSize;
private int multiplier = FRAME_SIZE_BYTES;
@Override
public void parse(ByteBuf input) throws AssertionError {
while (input.isReadable()) {
frameSize |= (input.readByte() & 0xFF) << (--multiplier * Byte.SIZE);
if (multiplier == 0) {
break;
}
}
if (multiplier == 0) {
validateFrameSize();
// Normalize the frame size to the reminder portion
int length = frameSize - FRAME_SIZE_BYTES;
if (input.readableBytes() < length) {
transitionToFrameBufferingStage(length);
} else {
initializeFrameBodyParsingStage(length);
}
stage.parse(input);
}
}
private void validateFrameSize() throws AssertionError {
if (frameSize < 8) {
throw new AssertionError(String.format(
"specified frame size %d smaller than minimum frame header size 8", frameSize));
}
if (frameSize > driver.getInboundMaxFrameSize()) {
throw new AssertionError(String.format(
"specified frame size %d larger than maximum frame size %d", frameSize, driver.getInboundMaxFrameSize()));
}
}
@Override
public FrameSizeParsingStage reset(int frameSize) {
multiplier = FRAME_SIZE_BYTES;
this.frameSize = frameSize;
return this;
}
}
private class FrameBufferingStage implements FrameParserStage {
private ByteBuf buffer;
@Override
public void parse(ByteBuf input) throws AssertionError {
if (input.readableBytes() < buffer.writableBytes()) {
buffer.writeBytes(input);
} else {
buffer.writeBytes(input, buffer.writableBytes());
// Now we can consume the buffer frame body.
initializeFrameBodyParsingStage(buffer.readableBytes());
try {
stage.parse(buffer);
} finally {
buffer = null;
}
}
}
@Override
public FrameBufferingStage reset(int frameSize) {
buffer = Unpooled.buffer(frameSize, frameSize);
return this;
}
}
private class FrameBodyParsingStage implements FrameParserStage {
private int frameSize;
@Override
public void parse(ByteBuf input) throws AssertionError {
int dataOffset = (input.readByte() << 2) & 0x3FF;
int frameSize = this.frameSize + FRAME_SIZE_BYTES;
validateDataOffset(dataOffset, frameSize);
int type = input.readByte() & 0xFF;
short channel = input.readShort();
// note that this skips over the extended header if it's present
if (dataOffset != 8) {
input.readerIndex(input.readerIndex() + dataOffset - 8);
}
final int frameBodySize = frameSize - dataOffset;
ByteBuf payload = null;
Object val = null;
if (frameBodySize > 0) {
int frameBodyStartIndex = input.readerIndex();
try {
codec.decode(input);
} catch (Exception e) {
throw new AssertionError("Decoder failed reading remote input:", e);
}
Codec.DataType dataType = codec.type();
if (dataType != Codec.DataType.DESCRIBED) {
throw new IllegalArgumentException(
"Frame body type expected to be " + Codec.DataType.DESCRIBED + " but was: " + dataType);
}
try {
val = codec.getDescribedType();
} finally {
codec.clear();
}
// Slice to the known Frame body size and use that as the buffer for any payload once
// the actual Performative has been decoded. The implies that the data comprising the
// performative will be held as long as the payload buffer is kept.
if (input.isReadable()) {
// Check that the remaining bytes aren't part of another frame.
int payloadSize = frameBodySize - (input.readerIndex() - frameBodyStartIndex);
if (payloadSize > 0) {
payload = input.slice(input.readerIndex(), payloadSize);
input.skipBytes(payloadSize);
}
}
} else {
LOG.trace("{} Read: CH[{}] : {} [{}]", driver.getName(), channel, HeartBeat.INSTANCE, payload);
transitionToFrameSizeParsingStage();
driver.handleHeartbeat(frameSize, channel);
return;
}
if (type == AMQP_FRAME_TYPE) {
PerformativeDescribedType performative = (PerformativeDescribedType) val;
LOG.trace("{} Read: CH[{}] : {} [{}]", driver.getName(), channel, performative, payload);
transitionToFrameSizeParsingStage();
driver.handlePerformative(frameSize, performative, channel, payload);
} else if (type == SASL_FRAME_TYPE) {
SaslDescribedType performative = (SaslDescribedType) val;
LOG.trace("{} Read: {} [{}]", driver.getName(), performative, payload);
transitionToFrameSizeParsingStage();
driver.handleSaslPerformative(frameSize, performative, channel, payload);
} else {
throw new AssertionError(String.format("unknown frame type: %d", type));
}
}
@Override
public FrameBodyParsingStage reset(int frameSize) {
this.frameSize = frameSize;
return this;
}
private void validateDataOffset(int dataOffset, int frameSize) {
if (dataOffset < 8) {
throw new AssertionError(String.format(
"specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8));
}
if (dataOffset > frameSize) {
throw new AssertionError(String.format(
"specified frame data offset %d larger than the frame size %d", dataOffset, frameSize));
}
}
}
/*
* If parsing fails the parser enters the failed state and remains there always throwing the given exception
* if additional parsing is requested.
*/
private static class ParsingErrorStage implements FrameParserStage {
private final AssertionError parsingError;
public ParsingErrorStage(AssertionError parsingError) {
this.parsingError = parsingError;
}
@Override
public void parse(ByteBuf input) throws AssertionError {
throw parsingError;
}
@Override
public ParsingErrorStage reset(int frameSize) {
return this;
}
}
}