blob: 247a5e9e36372d8697176d1ba3d15230f7a9b113 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.activemq.transport.amqp.AmqpWireFormat.ResetListener;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* State based Frame reader that is used in the NIO based transports where
* AMQP frames can come in in partial or overlapping forms.
*/
public class AmqpFrameParser {
private static final Logger LOG = LoggerFactory.getLogger(AmqpFrameParser.class);
public interface AMQPFrameSink {
void onFrame(Object frame);
}
private static final byte AMQP_FRAME_SIZE_BYTES = 4;
private static final byte AMQP_HEADER_BYTES = 8;
private final AMQPFrameSink frameSink;
private FrameParser currentParser;
private AmqpWireFormat wireFormat;
public AmqpFrameParser(AMQPFrameSink sink) {
this.frameSink = sink;
}
public AmqpFrameParser(final TcpTransport transport) {
this.frameSink = new AMQPFrameSink() {
@Override
public void onFrame(Object frame) {
transport.doConsume(frame);
}
};
}
public void parse(ByteBuffer incoming) throws Exception {
if (incoming == null || !incoming.hasRemaining()) {
return;
}
if (currentParser == null) {
currentParser = initializeHeaderParser();
}
// Parser stack will run until current incoming data has all been consumed.
currentParser.parse(incoming);
}
public void reset() {
currentParser = initializeHeaderParser();
}
private void validateFrameSize(int frameSize) throws IOException {
long maxFrameSize = AmqpWireFormat.DEFAULT_MAX_FRAME_SIZE;
if (wireFormat != null) {
maxFrameSize = wireFormat.getMaxFrameSize();
}
if (frameSize > maxFrameSize) {
throw new IOException("Frame size of " + frameSize + " larger than max allowed " + maxFrameSize);
}
}
public void setWireFormat(AmqpWireFormat wireFormat) {
this.wireFormat = wireFormat;
if (wireFormat != null) {
wireFormat.setProtocolResetListener(new ResetListener() {
@Override
public void onProtocolReset() {
reset();
}
});
}
}
public AmqpWireFormat getWireFormat() {
return this.wireFormat;
}
//----- Prepare the current frame parser for use -------------------------//
private FrameParser initializeHeaderParser() {
headerReader.reset(AMQP_HEADER_BYTES);
return headerReader;
}
private FrameParser initializeFrameLengthParser() {
frameSizeReader.reset(AMQP_FRAME_SIZE_BYTES);
return frameSizeReader;
}
private FrameParser initializeContentReader(int contentLength) {
contentReader.reset(contentLength);
return contentReader;
}
//----- Frame parser implementations -------------------------------------//
private interface FrameParser {
void parse(ByteBuffer incoming) throws IOException;
void reset(int nextExpectedReadSize);
}
private final FrameParser headerReader = new FrameParser() {
private final Buffer header = new Buffer(AMQP_HEADER_BYTES);
@Override
public void parse(ByteBuffer incoming) throws IOException {
int length = Math.min(incoming.remaining(), header.length - header.offset);
incoming.get(header.data, header.offset, length);
header.offset += length;
if (header.offset == AMQP_HEADER_BYTES) {
header.reset();
AmqpHeader amqpHeader = new AmqpHeader(header.deepCopy(), false);
currentParser = initializeFrameLengthParser();
frameSink.onFrame(amqpHeader);
if (incoming.hasRemaining()) {
currentParser.parse(incoming);
}
}
}
@Override
public void reset(int nextExpectedReadSize) {
header.reset();
}
};
private final FrameParser frameSizeReader = new FrameParser() {
private int frameSize;
private int multiplier;
@Override
public void parse(ByteBuffer incoming) throws IOException {
while (incoming.hasRemaining()) {
frameSize += ((incoming.get() & 0xFF) << --multiplier * Byte.SIZE);
if (multiplier == 0) {
LOG.trace("Next incoming frame length: {}", frameSize);
validateFrameSize(frameSize);
currentParser = initializeContentReader(frameSize);
if (incoming.hasRemaining()) {
currentParser.parse(incoming);
return;
}
}
}
}
@Override
public void reset(int nextExpectedReadSize) {
multiplier = AMQP_FRAME_SIZE_BYTES;
frameSize = 0;
}
};
private final FrameParser contentReader = new FrameParser() {
private Buffer frame;
@Override
public void parse(ByteBuffer incoming) throws IOException {
int length = Math.min(incoming.remaining(), frame.getLength() - frame.offset);
incoming.get(frame.data, frame.offset, length);
frame.offset += length;
if (frame.offset == frame.length) {
LOG.trace("Contents of size {} have been read", frame.length);
frame.reset();
frameSink.onFrame(frame);
if (currentParser == this) {
currentParser = initializeFrameLengthParser();
}
if (incoming.hasRemaining()) {
currentParser.parse(incoming);
}
}
}
@Override
public void reset(int nextExpectedReadSize) {
// Allocate a new Buffer to hold the incoming frame. We must write
// back the frame size value before continue on to read the indicated
// frame size minus the size of the AMQP frame size header value.
frame = new Buffer(nextExpectedReadSize);
frame.bigEndianEditor().writeInt(nextExpectedReadSize);
// Reset the length to total length as we do direct write after this.
frame.length = frame.data.length;
}
};
}