blob: fe2d7af9683586fa001b0e69bde0c4492ab100f3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.protocol.v0_10;
import static org.apache.qpid.server.transport.util.Functions.str;
import static org.apache.qpid.server.protocol.v0_10.ServerInputHandler.State.*;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.v0_10.transport.ProtocolError;
import org.apache.qpid.server.protocol.v0_10.transport.ProtocolHeader;
import org.apache.qpid.server.protocol.v0_10.transport.SegmentType;
public class ServerInputHandler implements FrameSizeObserver
{
private static final Logger LOGGER = LoggerFactory.getLogger(ServerInputHandler.class);
private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.allocateDirect(0);
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
public enum State
{
PROTO_HDR,
FRAME_HDR,
ERROR;
}
private final ServerAssembler _serverAssembler;
private State _state = PROTO_HDR;
private byte flags;
private SegmentType type;
private byte track;
private int channel;
public ServerInputHandler(ServerAssembler serverAssembler)
{
_serverAssembler = serverAssembler;
_state = PROTO_HDR;
}
@Override
public void setMaxFrameSize(final int maxFrameSize)
{
_maxFrameSize = maxFrameSize;
}
private void error(String fmt, Object ... args)
{
_serverAssembler.error(new ProtocolError(ServerFrame.L1, fmt, args));
}
public void received(QpidByteBuffer buf)
{
int position = buf.position();
List<ServerFrame> frames = new ArrayList<>();
while(buf.hasRemaining() && _state != ERROR)
{
buf.mark();
switch (_state) {
case PROTO_HDR:
if(buf.remaining() < 8)
{
break;
}
if (buf.get() != 'A' ||
buf.get() != 'M' ||
buf.get() != 'Q' ||
buf.get() != 'P')
{
buf.reset();
error("bad protocol header: %s", str(buf));
_state = ERROR;
}
else
{
byte protoClass = buf.get();
byte instance = buf.get();
byte major = buf.get();
byte minor = buf.get();
_serverAssembler.init(new ProtocolHeader(protoClass, instance, major, minor));
_state = FRAME_HDR;
}
break;
case FRAME_HDR:
if(buf.remaining() < ServerFrame.HEADER_SIZE)
{
buf.reset();
}
else
{
flags = buf.get();
type = SegmentType.get(buf.get());
int size = (0xFFFF & buf.getShort());
size -= ServerFrame.HEADER_SIZE;
if (size < 0 || size > (_maxFrameSize - ServerFrame.HEADER_SIZE))
{
error("bad frame size: %d", size);
_state = ERROR;
}
else
{
buf.get(); // skip unused byte
byte b = buf.get();
if ((b & 0xF0) != 0)
{
error("non-zero reserved bits in upper nibble of " +
"frame header byte 5: '%x'", b);
_state = ERROR;
}
else
{
track = (byte) (b & 0xF);
channel = (0xFFFF & buf.getShort());
buf.position(buf.position() + 4);
if (size == 0)
{
ServerFrame frame = new ServerFrame(flags, type, track, channel, EMPTY_BYTE_BUFFER.duplicate());
frames.add(frame);
}
else if (buf.remaining() < size)
{
buf.reset();
}
else
{
final QpidByteBuffer body = buf.slice();
body.limit(size);
ServerFrame frame = new ServerFrame(flags, type, track, channel, body);
frames.add(frame);
buf.position(buf.position() + size);
}
}
}
}
break;
default:
throw new IllegalStateException();
}
int newPosition = buf.position();
if(position == newPosition)
{
break;
}
else
{
position = newPosition;
}
}
_serverAssembler.received(frames);
}
public void exception(Throwable t)
{
_serverAssembler.exception(t);
}
public void closed()
{
_serverAssembler.closed();
}
}