| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.qpid.protonj2.test.driver; |
| |
| import org.apache.qpid.protonj2.test.driver.codec.Codec; |
| import org.apache.qpid.protonj2.test.driver.codec.security.SaslDescribedType; |
| import org.apache.qpid.protonj2.test.driver.codec.transport.AMQPHeader; |
| import org.apache.qpid.protonj2.test.driver.codec.transport.HeartBeat; |
| import org.apache.qpid.protonj2.test.driver.codec.transport.PerformativeDescribedType; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import io.netty.buffer.ByteBuf; |
| import io.netty.buffer.Unpooled; |
| |
| class FrameDecoder { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AMQPTestDriver.class); |
| |
| public static final byte AMQP_FRAME_TYPE = (byte) 0; |
| public static final byte SASL_FRAME_TYPE = (byte) 1; |
| |
| public static final int FRAME_SIZE_BYTES = 4; |
| |
| private final AMQPTestDriver driver; |
| private final Codec codec = Codec.Factory.create(); |
| private FrameParserStage stage = new HeaderParsingStage(); |
| |
| // Parser stages used during the parsing process |
| private final FrameSizeParsingStage frameSizeParser = new FrameSizeParsingStage(); |
| private final FrameBufferingStage frameBufferingStage = new FrameBufferingStage(); |
| private final FrameParserStage frameBodyParsingStage = new FrameBodyParsingStage(); |
| |
| public FrameDecoder(AMQPTestDriver driver) { |
| this.driver = driver; |
| } |
| |
| public void ingest(ByteBuf buffer) throws AssertionError { |
| try { |
| // Parses in-incoming data and emit one complete frame before returning, caller should |
| // ensure that the input buffer is drained into the engine or stop if the engine |
| // has changed to a non-writable state. |
| stage.parse(buffer); |
| } catch (AssertionError ex) { |
| transitionToErrorStage(ex); |
| throw ex; |
| } catch (Throwable throwable) { |
| AssertionError error = new AssertionError("Frame decode failed.", throwable); |
| transitionToErrorStage(error); |
| throw error; |
| } |
| } |
| |
| /** |
| * Resets the parser back to the expect a header state. |
| */ |
| public void resetToExpectingHeader() { |
| this.stage = new HeaderParsingStage(); |
| } |
| |
| //---- Methods to transition between stages |
| |
| private FrameParserStage transitionToFrameSizeParsingStage() { |
| return stage = frameSizeParser.reset(0); |
| } |
| |
| private FrameParserStage transitionToFrameBufferingStage(int frameSize) { |
| return stage = frameBufferingStage.reset(frameSize); |
| } |
| |
| private FrameParserStage initializeFrameBodyParsingStage(int frameSize) { |
| return stage = frameBodyParsingStage.reset(frameSize); |
| } |
| |
| private ParsingErrorStage transitionToErrorStage(AssertionError error) { |
| if (!(stage instanceof ParsingErrorStage)) { |
| stage = new ParsingErrorStage(error); |
| } |
| |
| return (ParsingErrorStage) stage; |
| } |
| |
| //----- Frame Parsing Stage definition |
| |
| private interface FrameParserStage { |
| |
| /** |
| * Parse the incoming data and provide events to the parent Transport |
| * based on the contents of that data. |
| * |
| * @param input |
| * The ByteBuf containing new data to be parsed. |
| * |
| * @throws AssertionError if an error occurs while parsing incoming data. |
| */ |
| void parse(ByteBuf input) throws AssertionError; |
| |
| /** |
| * Reset the stage to its defaults for a new cycle of parsing. |
| * |
| * @param frameSize |
| * The frameSize to use for this part of the parsing operation |
| * |
| * @return a reference to this parsing stage for chaining. |
| */ |
| FrameParserStage reset(int frameSize); |
| |
| } |
| |
| //---- Built in FrameParserStages |
| |
| private class HeaderParsingStage implements FrameParserStage { |
| |
| private final byte[] headerBytes = new byte[AMQPHeader.HEADER_SIZE_BYTES]; |
| |
| private int headerByte; |
| |
| @Override |
| public void parse(ByteBuf incoming) throws AssertionError { |
| while (incoming.isReadable() && headerByte < AMQPHeader.HEADER_SIZE_BYTES) { |
| headerBytes[headerByte++] = incoming.readByte(); |
| } |
| |
| if (headerByte == AMQPHeader.HEADER_SIZE_BYTES) { |
| // Construct a new Header from the read bytes which will validate the contents |
| AMQPHeader header = new AMQPHeader(headerBytes); |
| |
| // Transition to parsing the frames if any pipelined into this buffer. |
| transitionToFrameSizeParsingStage(); |
| |
| if (header.isSaslHeader()) { |
| driver.handleHeader(AMQPHeader.getSASLHeader()); |
| } else { |
| driver.handleHeader(AMQPHeader.getAMQPHeader()); |
| } |
| } |
| } |
| |
| @Override |
| public HeaderParsingStage reset(int frameSize) { |
| headerByte = 0; |
| return this; |
| } |
| } |
| |
| private class FrameSizeParsingStage implements FrameParserStage { |
| |
| private int frameSize; |
| private int multiplier = FRAME_SIZE_BYTES; |
| |
| @Override |
| public void parse(ByteBuf input) throws AssertionError { |
| while (input.isReadable()) { |
| frameSize |= (input.readByte() & 0xFF) << (--multiplier * Byte.SIZE); |
| if (multiplier == 0) { |
| break; |
| } |
| } |
| |
| if (multiplier == 0) { |
| validateFrameSize(); |
| |
| // Normalize the frame size to the reminder portion |
| int length = frameSize - FRAME_SIZE_BYTES; |
| |
| if (input.readableBytes() < length) { |
| transitionToFrameBufferingStage(length); |
| } else { |
| initializeFrameBodyParsingStage(length); |
| } |
| |
| stage.parse(input); |
| } |
| } |
| |
| private void validateFrameSize() throws AssertionError { |
| if (frameSize < 8) { |
| throw new AssertionError(String.format( |
| "specified frame size %d smaller than minimum frame header size 8", frameSize)); |
| } |
| |
| if (frameSize > driver.getInboundMaxFrameSize()) { |
| throw new AssertionError(String.format( |
| "specified frame size %d larger than maximum frame size %d", frameSize, driver.getInboundMaxFrameSize())); |
| } |
| } |
| |
| @Override |
| public FrameSizeParsingStage reset(int frameSize) { |
| multiplier = FRAME_SIZE_BYTES; |
| this.frameSize = frameSize; |
| return this; |
| } |
| } |
| |
| private class FrameBufferingStage implements FrameParserStage { |
| |
| private ByteBuf buffer; |
| |
| @Override |
| public void parse(ByteBuf input) throws AssertionError { |
| if (input.readableBytes() < buffer.writableBytes()) { |
| buffer.writeBytes(input); |
| } else { |
| buffer.writeBytes(input, buffer.writableBytes()); |
| |
| // Now we can consume the buffer frame body. |
| initializeFrameBodyParsingStage(buffer.readableBytes()); |
| try { |
| stage.parse(buffer); |
| } finally { |
| buffer = null; |
| } |
| } |
| } |
| |
| @Override |
| public FrameBufferingStage reset(int frameSize) { |
| buffer = Unpooled.buffer(frameSize, frameSize); |
| return this; |
| } |
| } |
| |
| private class FrameBodyParsingStage implements FrameParserStage { |
| |
| private int frameSize; |
| |
| @Override |
| public void parse(ByteBuf input) throws AssertionError { |
| int dataOffset = (input.readByte() << 2) & 0x3FF; |
| int frameSize = this.frameSize + FRAME_SIZE_BYTES; |
| |
| validateDataOffset(dataOffset, frameSize); |
| |
| int type = input.readByte() & 0xFF; |
| short channel = input.readShort(); |
| |
| // note that this skips over the extended header if it's present |
| if (dataOffset != 8) { |
| input.readerIndex(input.readerIndex() + dataOffset - 8); |
| } |
| |
| final int frameBodySize = frameSize - dataOffset; |
| |
| ByteBuf payload = null; |
| Object val = null; |
| |
| if (frameBodySize > 0) { |
| int frameBodyStartIndex = input.readerIndex(); |
| |
| try { |
| codec.decode(input); |
| } catch (Exception e) { |
| throw new AssertionError("Decoder failed reading remote input:", e); |
| } |
| |
| Codec.DataType dataType = codec.type(); |
| if (dataType != Codec.DataType.DESCRIBED) { |
| throw new IllegalArgumentException( |
| "Frame body type expected to be " + Codec.DataType.DESCRIBED + " but was: " + dataType); |
| } |
| |
| try { |
| val = codec.getDescribedType(); |
| } finally { |
| codec.clear(); |
| } |
| |
| // Slice to the known Frame body size and use that as the buffer for any payload once |
| // the actual Performative has been decoded. The implies that the data comprising the |
| // performative will be held as long as the payload buffer is kept. |
| if (input.isReadable()) { |
| // Check that the remaining bytes aren't part of another frame. |
| int payloadSize = frameBodySize - (input.readerIndex() - frameBodyStartIndex); |
| if (payloadSize > 0) { |
| payload = input.slice(input.readerIndex(), payloadSize); |
| input.skipBytes(payloadSize); |
| } |
| } |
| } else { |
| LOG.trace("{} Read: CH[{}] : {} [{}]", driver.getName(), channel, HeartBeat.INSTANCE, payload); |
| transitionToFrameSizeParsingStage(); |
| driver.handleHeartbeat(frameSize, channel); |
| return; |
| } |
| |
| if (type == AMQP_FRAME_TYPE) { |
| PerformativeDescribedType performative = (PerformativeDescribedType) val; |
| LOG.trace("{} Read: CH[{}] : {} [{}]", driver.getName(), channel, performative, payload); |
| transitionToFrameSizeParsingStage(); |
| driver.handlePerformative(frameSize, performative, channel, payload); |
| } else if (type == SASL_FRAME_TYPE) { |
| SaslDescribedType performative = (SaslDescribedType) val; |
| LOG.trace("{} Read: {} [{}]", driver.getName(), performative, payload); |
| transitionToFrameSizeParsingStage(); |
| driver.handleSaslPerformative(frameSize, performative, channel, payload); |
| } else { |
| throw new AssertionError(String.format("unknown frame type: %d", type)); |
| } |
| } |
| |
| @Override |
| public FrameBodyParsingStage reset(int frameSize) { |
| this.frameSize = frameSize; |
| return this; |
| } |
| |
| private void validateDataOffset(int dataOffset, int frameSize) { |
| if (dataOffset < 8) { |
| throw new AssertionError(String.format( |
| "specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8)); |
| } |
| |
| if (dataOffset > frameSize) { |
| throw new AssertionError(String.format( |
| "specified frame data offset %d larger than the frame size %d", dataOffset, frameSize)); |
| } |
| } |
| } |
| |
| /* |
| * If parsing fails the parser enters the failed state and remains there always throwing the given exception |
| * if additional parsing is requested. |
| */ |
| private static class ParsingErrorStage implements FrameParserStage { |
| |
| private final AssertionError parsingError; |
| |
| public ParsingErrorStage(AssertionError parsingError) { |
| this.parsingError = parsingError; |
| } |
| |
| @Override |
| public void parse(ByteBuf input) throws AssertionError { |
| throw parsingError; |
| } |
| |
| @Override |
| public ParsingErrorStage reset(int frameSize) { |
| return this; |
| } |
| } |
| } |