blob: c73b82376b0d6cf8b4ace926efffade8c346bd30 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.transport.network;
import static org.apache.qpid.transport.network.InputHandler.State.ERROR;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_BODY;
import static org.apache.qpid.transport.network.InputHandler.State.FRAME_HDR;
import static org.apache.qpid.transport.network.InputHandler.State.PROTO_HDR;
import static org.apache.qpid.transport.util.Functions.str;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.transport.Constant;
import org.apache.qpid.transport.ExceptionHandlingByteBufferReceiver;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.NetworkEventReceiver;
import org.apache.qpid.transport.ProtocolError;
import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
/**
* InputHandler
*
* @author Rafael H. Schloming
*/
public class InputHandler implements ExceptionHandlingByteBufferReceiver, FrameSizeObserver
{
private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class);
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer
.allocate(0);
private int _maxFrameSize = Constant.MIN_MAX_FRAME_SIZE;
public enum State
{
PROTO_HDR,
FRAME_HDR,
FRAME_BODY,
ERROR;
}
private final NetworkEventReceiver receiver;
private State state;
private ByteBuffer input = null;
private int needed;
private byte flags;
private SegmentType type;
private byte track;
private int channel;
public InputHandler(NetworkEventReceiver receiver)
{
this.receiver = receiver;
this.state = PROTO_HDR;
switch (state)
{
case PROTO_HDR:
needed = 8;
break;
case FRAME_HDR:
needed = Frame.HEADER_SIZE;
break;
}
}
public void setMaxFrameSize(final int maxFrameSize)
{
_maxFrameSize = maxFrameSize;
}
private void error(String fmt, Object ... args)
{
receiver.received(new ProtocolError(Frame.L1, fmt, args));
}
@Override
public void received(ByteBuffer buf)
{
int limit = buf.limit();
int remaining = buf.remaining();
while (remaining > 0)
{
if (remaining >= needed)
{
int consumed = needed;
int pos = buf.position();
if (input == null)
{
buf.limit(pos + needed);
input = buf;
state = next(pos);
buf.limit(limit);
buf.position(pos + consumed);
}
else
{
buf.limit(pos + needed);
input.put(buf);
buf.limit(limit);
input.flip();
state = next(0);
}
remaining -= consumed;
input = null;
}
else
{
if (input == null)
{
input = ByteBuffer.allocate(needed);
}
input.put(buf);
needed -= remaining;
remaining = 0;
}
}
}
private State next(int pos)
{
input.order(ByteOrder.BIG_ENDIAN);
switch (state) {
case PROTO_HDR:
if (input.get(pos) != 'A' &&
input.get(pos + 1) != 'M' &&
input.get(pos + 2) != 'Q' &&
input.get(pos + 3) != 'P')
{
error("bad protocol header: %s", str(input));
return ERROR;
}
byte protoClass = input.get(pos + 4);
byte instance = input.get(pos + 5);
byte major = input.get(pos + 6);
byte minor = input.get(pos + 7);
receiver.received(new ProtocolHeader(protoClass, instance, major, minor));
needed = Frame.HEADER_SIZE;
return FRAME_HDR;
case FRAME_HDR:
flags = input.get(pos);
type = SegmentType.get(input.get(pos + 1));
int size = (0xFFFF & input.getShort(pos + 2));
size -= Frame.HEADER_SIZE;
_maxFrameSize = 64 * 1024;
if (size < 0 || size > (_maxFrameSize - 12))
{
error("bad frame size: %d", size);
return ERROR;
}
byte b = input.get(pos + 5);
if ((b & 0xF0) != 0) {
error("non-zero reserved bits in upper nibble of " +
"frame header byte 5: '%x'", b);
return ERROR;
} else {
track = (byte) (b & 0xF);
}
channel = (0xFFFF & input.getShort(pos + 6));
if (size == 0)
{
Frame frame = new Frame(flags, type, track, channel, EMPTY_BYTE_BUFFER);
receiver.received(frame);
needed = Frame.HEADER_SIZE;
return FRAME_HDR;
}
else
{
needed = size;
return FRAME_BODY;
}
case FRAME_BODY:
Frame frame = new Frame(flags, type, track, channel, input.slice());
receiver.received(frame);
needed = Frame.HEADER_SIZE;
return FRAME_HDR;
default:
throw new IllegalStateException();
}
}
public void exception(Throwable t)
{
receiver.exception(t);
}
public void closed()
{
receiver.closed();
}
}