blob: 11cf1736f9fc42b4f3006971f7cdf20d57d9d20c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.engine.impl;
import org.apache.qpid.protonj2.buffer.ProtonBuffer;
import org.apache.qpid.protonj2.buffer.ProtonByteBufferAllocator;
import org.apache.qpid.protonj2.codec.CodecFactory;
import org.apache.qpid.protonj2.codec.DecodeException;
import org.apache.qpid.protonj2.codec.Decoder;
import org.apache.qpid.protonj2.codec.DecoderState;
import org.apache.qpid.protonj2.engine.AMQPPerformativeEnvelopePool;
import org.apache.qpid.protonj2.engine.EmptyEnvelope;
import org.apache.qpid.protonj2.engine.EngineHandler;
import org.apache.qpid.protonj2.engine.EngineHandlerContext;
import org.apache.qpid.protonj2.engine.HeaderEnvelope;
import org.apache.qpid.protonj2.engine.IncomingAMQPEnvelope;
import org.apache.qpid.protonj2.engine.SASLEnvelope;
import org.apache.qpid.protonj2.engine.exceptions.EngineFailedException;
import org.apache.qpid.protonj2.engine.exceptions.FrameDecodingException;
import org.apache.qpid.protonj2.engine.exceptions.MalformedAMQPHeaderException;
import org.apache.qpid.protonj2.engine.exceptions.ProtonException;
import org.apache.qpid.protonj2.logging.ProtonLogger;
import org.apache.qpid.protonj2.logging.ProtonLoggerFactory;
import org.apache.qpid.protonj2.types.security.SaslOutcome;
import org.apache.qpid.protonj2.types.security.SaslPerformative;
import org.apache.qpid.protonj2.types.transport.AMQPHeader;
import org.apache.qpid.protonj2.types.transport.Performative;
/**
* Handler used to parse incoming frame data input into the engine
*/
public class ProtonFrameDecodingHandler implements EngineHandler, SaslPerformative.SaslPerformativeHandler<EngineHandlerContext> {
private static final ProtonLogger LOG = ProtonLoggerFactory.getLogger(ProtonFrameDecodingHandler.class);
/**
* Frame type indicator for AMQP protocol frames.
*/
public static final byte AMQP_FRAME_TYPE = (byte) 0;
/**
* Frame type indicator for SASL protocol frames.
*/
public static final byte SASL_FRAME_TYPE = (byte) 1;
/**
* The specified encoding size for the frame size value of each encoded frame.
*/
public static final int FRAME_SIZE_BYTES = 4;
private final AMQPPerformativeEnvelopePool<IncomingAMQPEnvelope> framePool = AMQPPerformativeEnvelopePool.incomingEnvelopePool();
private Decoder decoder;
private DecoderState decoderState;
private FrameParserStage stage = new HeaderParsingStage();
private ProtonEngine engine;
private ProtonEngineConfiguration configuration;
// Parser stages used during the parsing process
private final FrameSizeParsingStage frameSizeParser = new FrameSizeParsingStage();
private final FrameBufferingStage frameBufferingStage = new FrameBufferingStage();
private final FrameBodyParsingStage frameBodyParsingStage = new FrameBodyParsingStage();
//----- Handler method implementations
@Override
public void handlerAdded(EngineHandlerContext context) {
engine = (ProtonEngine) context.engine();
configuration = engine.configuration();
}
@Override
public void engineFailed(EngineHandlerContext context, EngineFailedException failure) {
transitionToErrorStage(failure);
context.fireFailed(failure);
}
@Override
public void handleRead(EngineHandlerContext context, ProtonBuffer buffer) {
try {
// Parses in-incoming data and emit events for complete frames before returning, caller
// should ensure that the input buffer is drained into the engine or stop if the engine
// has changed to a non-writable state.
while (buffer.isReadable() && engine.isWritable()) {
stage.parse(context, buffer);
}
} catch (FrameDecodingException frameEx) {
transitionToErrorStage(frameEx).fireError(context);
} catch (ProtonException pex) {
transitionToErrorStage(pex).fireError(context);
} catch (DecodeException ex) {
transitionToErrorStage(new FrameDecodingException(ex.getMessage(), ex)).fireError(context);
} catch (Exception error) {
transitionToErrorStage(new ProtonException(error.getMessage(), error)).fireError(context);
}
}
@Override
public void handleRead(EngineHandlerContext context, SASLEnvelope envelope) {
envelope.getBody().invoke(this, context);
context.fireRead(envelope);
}
@Override
public void handleWrite(EngineHandlerContext context, SASLEnvelope envelope) {
envelope.invoke(this, context);
context.fireWrite(envelope);
}
//----- SASL Performative Handler to check for change to non-SASL state
@Override
public void handleOutcome(SaslOutcome saslOutcome, EngineHandlerContext context) {
// When we have read or written a SASL Outcome the next value to be read
// should be an AMQP Header to begin the next phase of the connection.
this.stage = new HeaderParsingStage();
}
//---- Methods to transition between stages
private FrameParserStage transitionToFrameSizeParsingStage() {
return stage = frameSizeParser.reset(0);
}
private FrameParserStage transitionToFrameBufferingStage(int length) {
return stage = frameBufferingStage.reset(length);
}
private FrameParserStage initializeFrameBodyParsingStage(int length) {
return stage = frameBodyParsingStage.reset(length);
}
private ParsingErrorStage transitionToErrorStage(ProtonException error) {
if (!(stage instanceof ParsingErrorStage)) {
LOG.trace("Frame decoder encountered error: ", error);
stage = new ParsingErrorStage(error);
}
return (ParsingErrorStage) stage;
}
//----- Frame Parsing Stage definition
private interface FrameParserStage {
/**
* Parse the incoming data and provide events to the parent Transport
* based on the contents of that data.
*
* @param context
* The TransportHandlerContext that applies to the current event
* @param input
* The ProtonBuffer containing new data to be parsed.
*/
void parse(EngineHandlerContext context, ProtonBuffer input);
/**
* Reset the stage to its defaults for a new cycle of parsing.
*
* @param length
* The length to use for this part of the parsing operation
*
* @return a reference to this parsing stage for chaining.
*/
FrameParserStage reset(int length);
}
//---- Built in FrameParserStages
private class HeaderParsingStage implements FrameParserStage {
private final byte[] headerBytes = new byte[AMQPHeader.HEADER_SIZE_BYTES];
private int headerByte;
@Override
public void parse(EngineHandlerContext context, ProtonBuffer incoming) {
while (incoming.isReadable() && headerByte < AMQPHeader.HEADER_SIZE_BYTES) {
byte nextByte = incoming.readByte();
try {
AMQPHeader.validateByte(headerByte, nextByte);
} catch (IllegalArgumentException iae) {
throw new MalformedAMQPHeaderException(
String.format("Error on validation of header byte %d with value of %d", headerByte, nextByte), iae);
}
headerBytes[headerByte++] = nextByte;
}
if (headerByte == AMQPHeader.HEADER_SIZE_BYTES) {
// Construct a new Header from the read bytes which will validate the contents
AMQPHeader header = new AMQPHeader(headerBytes);
// Transition to parsing the frames if any pipelined into this buffer.
transitionToFrameSizeParsingStage();
if (header.isSaslHeader()) {
decoder = CodecFactory.getSaslDecoder();
decoderState = decoder.newDecoderState();
context.fireRead(HeaderEnvelope.SASL_HEADER_ENVELOPE);
} else {
decoder = CodecFactory.getDecoder();
decoderState = decoder.newDecoderState();
// Once we've read an AMQP header we no longer care if any SASL work
// occurs as that would be erroneous behavior which this handler doesn't
// deal with.
((ProtonEngineHandlerContext) context).interestMask(ProtonEngineHandlerContext.HANDLER_READS);
context.fireRead(HeaderEnvelope.AMQP_HEADER_ENVELOPE);
}
}
}
@Override
public HeaderParsingStage reset(int frameSize) {
headerByte = 0;
return this;
}
}
private class FrameSizeParsingStage implements FrameParserStage {
private int frameSize;
private int multiplier = FRAME_SIZE_BYTES;
@Override
public void parse(EngineHandlerContext context, ProtonBuffer input) {
while (input.isReadable()) {
frameSize |= ((input.readByte() & 0xFF) << --multiplier * Byte.SIZE);
if (multiplier == 0) {
break;
}
}
if (multiplier == 0) {
validateFrameSize();
int length = frameSize - FRAME_SIZE_BYTES;
if (input.getReadableBytes() < length) {
transitionToFrameBufferingStage(length);
} else {
initializeFrameBodyParsingStage(length);
}
stage.parse(context, input);
}
}
private void validateFrameSize() throws FrameDecodingException {
if (Integer.compareUnsigned(frameSize, 8) < 0) {
throw new FrameDecodingException(String.format(
"specified frame size %d smaller than minimum frame header size 8", frameSize));
}
if (Integer.toUnsignedLong(frameSize) > configuration.getInboundMaxFrameSize()) {
throw new FrameDecodingException(String.format(
"specified frame size %s larger than maximum frame size %d",
Integer.toUnsignedString(frameSize), configuration.getInboundMaxFrameSize()));
}
}
@Override
public FrameSizeParsingStage reset(int frameSize) {
multiplier = FRAME_SIZE_BYTES;
this.frameSize = frameSize;
return this;
}
}
private class FrameBufferingStage implements FrameParserStage {
private ProtonBuffer buffer;
@Override
public void parse(EngineHandlerContext context, ProtonBuffer input) {
if (input.getReadableBytes() < buffer.getWritableBytes()) {
buffer.writeBytes(input);
} else {
buffer.writeBytes(input, buffer.getWritableBytes());
// Now we can consume the buffer frame body.
initializeFrameBodyParsingStage(buffer.getReadableBytes());
try {
stage.parse(context, buffer);
} finally {
buffer = null;
}
}
}
@Override
public FrameBufferingStage reset(int length) {
buffer = ProtonByteBufferAllocator.DEFAULT.allocate(length, length);
return this;
}
}
private class FrameBodyParsingStage implements FrameParserStage {
private int length;
@Override
public void parse(EngineHandlerContext context, ProtonBuffer input) {
int dataOffset = (input.readByte() << 2) & 0x3FF;
int frameSize = length + FRAME_SIZE_BYTES;
validateDataOffset(dataOffset, frameSize);
int type = input.readByte() & 0xFF;
short channel = input.readShort();
// Skip over the extended header if present (i.e offset > 8)
if (dataOffset != 8) {
input.setReadIndex(input.getReadIndex() + dataOffset - 8);
}
final int frameBodySize = frameSize - dataOffset;
ProtonBuffer payload = null;
Object val = null;
if (frameBodySize > 0) {
int startReadIndex = input.getReadIndex();
val = decoder.readObject(input, decoderState);
// Copy the payload portion of the incoming bytes for now as the incoming may be
// from a wrapped pooled buffer and for now we have no way of retaining or otherwise
// ensuring that the buffer remains ours. Since we might want to store received
// data at a client level and decode later we could end up losing the data to reuse
// if it was pooled.
if (input.isReadable()) {
int payloadSize = frameBodySize - (input.getReadIndex() - startReadIndex);
// Check that the remaining bytes aren't part of another frame.
if (payloadSize > 0) {
payload = configuration.getBufferAllocator().allocate(payloadSize, payloadSize);
payload.writeBytes(input, payloadSize);
}
}
} else {
transitionToFrameSizeParsingStage();
context.fireRead(EmptyEnvelope.INSTANCE);
return;
}
if (type == AMQP_FRAME_TYPE) {
Performative performative = (Performative) val;
IncomingAMQPEnvelope frame = framePool.take(performative, channel, payload);
transitionToFrameSizeParsingStage();
context.fireRead(frame);
} else if (type == SASL_FRAME_TYPE) {
SaslPerformative performative = (SaslPerformative) val;
SASLEnvelope saslFrame = new SASLEnvelope(performative);
transitionToFrameSizeParsingStage();
// Ensure we process transition from SASL to AMQP header state
handleRead(context, saslFrame);
} else {
throw new FrameDecodingException(String.format("unknown frame type: %d", type));
}
}
private void validateDataOffset(int dataOffset, int frameSize) throws FrameDecodingException {
if (dataOffset < 8) {
throw new FrameDecodingException(String.format(
"specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8));
}
if (dataOffset > frameSize) {
throw new FrameDecodingException(String.format(
"specified frame data offset %d larger than the frame size %d", dataOffset, frameSize));
}
}
@Override
public FrameBodyParsingStage reset(int length) {
this.length = length;
return this;
}
}
/*
* If parsing fails the parser enters the failed state and remains there always throwing the given exception
* if additional parsing is requested.
*/
private static class ParsingErrorStage implements FrameParserStage {
private final ProtonException parsingError;
public ParsingErrorStage(ProtonException parsingError) {
this.parsingError = parsingError;
}
public void fireError(EngineHandlerContext context) {
throw parsingError;
}
@Override
public void parse(EngineHandlerContext context, ProtonBuffer input) {
throw new FrameDecodingException(parsingError);
}
@Override
public ParsingErrorStage reset(int length) {
return this;
}
}
}