blob: 38760f8a1c8378fae9454efa8045265bd6961fae [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.proton.engine.impl;
import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER;
import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.EmptyFrame;
import org.apache.qpid.proton.amqp.transport.FrameBody;
import org.apache.qpid.proton.codec.ByteBufferDecoder;
import org.apache.qpid.proton.codec.DecodeException;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.TransportException;
import org.apache.qpid.proton.framing.TransportFrame;
class FrameParser implements TransportInput
{
private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace");
private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0);
private enum State
{
HEADER0,
HEADER1,
HEADER2,
HEADER3,
HEADER4,
HEADER5,
HEADER6,
HEADER7,
SIZE_0,
SIZE_1,
SIZE_2,
SIZE_3,
PRE_PARSE,
BUFFERING,
PARSING,
ERROR
}
private final FrameHandler _frameHandler;
private final ByteBufferDecoder _decoder;
private final int _inputBufferSize;
private final int _localMaxFrameSize;
private ByteBuffer _inputBuffer = null;
private boolean _tail_closed = false;
private State _state = State.HEADER0;
private long _framesInput = 0;
/** the stated size of the current frame */
private int _size;
/** holds the current frame that is being parsed */
private ByteBuffer _frameBuffer;
private TransportFrame _heldFrame;
private TransportException _parsingError;
/**
* We store the last result when processing input so that
* we know not to process any more input if it was an error.
*/
FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize)
{
_frameHandler = frameHandler;
_decoder = decoder;
_localMaxFrameSize = localMaxFrameSize;
_inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 4*1024;
}
private void input(ByteBuffer in) throws TransportException
{
flushHeldFrame();
if (_heldFrame != null)
{
return;
}
TransportException frameParsingError = null;
int size = _size;
State state = _state;
ByteBuffer oldIn = null;
boolean transportAccepting = true;
while(in.hasRemaining() && state != State.ERROR && transportAccepting)
{
switch(state)
{
case HEADER0:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[0])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[0], state);
state = State.ERROR;
break;
}
state = State.HEADER1;
}
else
{
break;
}
case HEADER1:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[1])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[1], state);
state = State.ERROR;
break;
}
state = State.HEADER2;
}
else
{
break;
}
case HEADER2:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[2])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[2], state);
state = State.ERROR;
break;
}
state = State.HEADER3;
}
else
{
break;
}
case HEADER3:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[3])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[3], state);
state = State.ERROR;
break;
}
state = State.HEADER4;
}
else
{
break;
}
case HEADER4:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[4])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[4], state);
state = State.ERROR;
break;
}
state = State.HEADER5;
}
else
{
break;
}
case HEADER5:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[5])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[5], state);
state = State.ERROR;
break;
}
state = State.HEADER6;
}
else
{
break;
}
case HEADER6:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[6])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[6], state);
state = State.ERROR;
break;
}
state = State.HEADER7;
}
else
{
break;
}
case HEADER7:
if(in.hasRemaining())
{
byte c = in.get();
if(c != HEADER[7])
{
frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[7], state);
state = State.ERROR;
break;
}
state = State.SIZE_0;
}
else
{
break;
}
case SIZE_0:
if(!in.hasRemaining())
{
break;
}
if(in.remaining() >= 4)
{
size = in.getInt();
state = State.PRE_PARSE;
break;
}
else
{
size = (in.get() << 24) & 0xFF000000;
if(!in.hasRemaining())
{
state = State.SIZE_1;
break;
}
}
case SIZE_1:
size |= (in.get() << 16) & 0xFF0000;
if(!in.hasRemaining())
{
state = State.SIZE_2;
break;
}
case SIZE_2:
size |= (in.get() << 8) & 0xFF00;
if(!in.hasRemaining())
{
state = State.SIZE_3;
break;
}
case SIZE_3:
size |= in.get() & 0xFF;
state = State.PRE_PARSE;
case PRE_PARSE:
if(size < 8)
{
frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header "
+ "size %d",
size, 8);
state = State.ERROR;
break;
}
if (_localMaxFrameSize > 0 && size > _localMaxFrameSize)
{
frameParsingError = new TransportException("specified frame size %d greater than maximum valid frame size %d",
size, _localMaxFrameSize);
state = State.ERROR;
break;
}
if(in.remaining() < size-4)
{
_frameBuffer = ByteBuffer.allocate(size-4);
_frameBuffer.put(in);
state = State.BUFFERING;
break;
}
case BUFFERING:
if(_frameBuffer != null)
{
if(in.remaining() < _frameBuffer.remaining())
{
_frameBuffer.put(in);
break;
}
else
{
ByteBuffer dup = in.duplicate();
dup.limit(dup.position()+_frameBuffer.remaining());
in.position(in.position()+_frameBuffer.remaining());
_frameBuffer.put(dup);
oldIn = in;
_frameBuffer.flip();
in = _frameBuffer;
state = State.PARSING;
}
}
case PARSING:
int dataOffset = (in.get() << 2) & 0x3FF;
if(dataOffset < 8)
{
frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8);
state = State.ERROR;
break;
}
else if(dataOffset > size)
{
frameParsingError = new TransportException("specified frame data offset %d larger than the frame size %d", dataOffset, size);
state = State.ERROR;
break;
}
// type
int type = in.get() & 0xFF;
int channel = in.getShort() & 0xFFFF;
if(type != 0)
{
frameParsingError = new TransportException("unknown frame type: %d", type);
state = State.ERROR;
break;
}
// note that this skips over the extended header if it's present
if(dataOffset!=8)
{
in.position(in.position()+dataOffset-8);
}
// oldIn null iff not working on duplicated buffer
final int frameBodySize = size - dataOffset;
if(oldIn == null)
{
oldIn = in;
in = in.duplicate();
final int endPos = in.position() + frameBodySize;
in.limit(endPos);
oldIn.position(endPos);
}
try
{
_framesInput += 1;
Binary payload = null;
Object val = null;
if (frameBodySize > 0)
{
_decoder.setByteBuffer(in);
val = _decoder.readObject();
_decoder.setByteBuffer(null);
if(in.hasRemaining())
{
byte[] payloadBytes = new byte[in.remaining()];
in.get(payloadBytes);
payload = new Binary(payloadBytes);
}
else
{
payload = null;
}
}
else
{
val = EmptyFrame.INSTANCE;
}
if(val instanceof FrameBody)
{
FrameBody frameBody = (FrameBody) val;
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "IN: CH["+channel+"] : " + frameBody + (payload == null ? "" : "[" + payload + "]"));
}
TransportFrame frame = new TransportFrame(channel, frameBody, payload);
if(_frameHandler.isHandlingFrames())
{
_tail_closed = _frameHandler.handleFrame(frame);
}
else
{
transportAccepting = false;
_heldFrame = frame;
}
}
else
{
throw new TransportException("Frameparser encountered a "
+ (val == null? "null" : val.getClass())
+ " which is not a " + FrameBody.class);
}
reset();
in = oldIn;
oldIn = null;
_frameBuffer = null;
state = State.SIZE_0;
}
catch (DecodeException ex)
{
state = State.ERROR;
frameParsingError = new TransportException(ex);
}
break;
case ERROR:
// do nothing
}
}
if (_tail_closed)
{
if (in.hasRemaining()) {
state = State.ERROR;
frameParsingError = new TransportException("framing error");
} else if (state != State.SIZE_0) {
state = State.ERROR;
frameParsingError = new TransportException("connection aborted");
} else {
_frameHandler.closed(null);
}
}
_state = state;
_size = size;
if(_state == State.ERROR)
{
_tail_closed = true;
if(frameParsingError != null)
{
_parsingError = frameParsingError;
_frameHandler.closed(frameParsingError);
}
else
{
throw new TransportException("Unable to parse, probably because of a previous error");
}
}
}
@Override
public int capacity()
{
if (_tail_closed) {
return Transport.END_OF_STREAM;
} else {
if (_inputBuffer != null) {
return _inputBuffer.remaining();
} else {
return _inputBufferSize;
}
}
}
@Override
public int position() {
if (_tail_closed) {
return Transport.END_OF_STREAM;
}
return (_inputBuffer == null) ? 0 : _inputBuffer.position();
}
@Override
public ByteBuffer tail()
{
if (_tail_closed) {
throw new TransportException("tail closed");
}
if (_inputBuffer == null) {
_inputBuffer = newWriteableBuffer(_inputBufferSize);
}
return _inputBuffer;
}
@Override
public void process() throws TransportException
{
if (_inputBuffer != null)
{
_inputBuffer.flip();
try
{
input(_inputBuffer);
}
finally
{
if (_inputBuffer.hasRemaining()) {
_inputBuffer.compact();
} else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) {
_inputBuffer = null;
} else {
_inputBuffer.clear();
}
}
}
else
{
input(_emptyInputBuffer);
}
}
@Override
public void close_tail()
{
_tail_closed = true;
process();
}
/**
* Attempt to flush any cached data to the frame transport. This function
* is useful if the {@link FrameHandler} state has changed.
*/
public void flush()
{
flushHeldFrame();
if (_heldFrame == null)
{
process();
}
}
private void flushHeldFrame()
{
if(_heldFrame != null && _frameHandler.isHandlingFrames())
{
_tail_closed = _frameHandler.handleFrame(_heldFrame);
_heldFrame = null;
}
}
private void reset()
{
_size = 0;
_state = State.SIZE_0;
}
long getFramesInput()
{
return _framesInput;
}
}