blob: f3bffdb8217c7df73f401f7642bf2a53e95897e6 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.IO;
using System.Text;
using org.apache.qpid.transport.util;
namespace org.apache.qpid.transport.network
{
/// <summary>
/// InputHandler
/// </summary>
public sealed class InputHandler : Receiver<ReceivedPayload<NetworkEvent>>
{
public enum State
{
PROTO_HDR,
FRAME_HDR,
FRAME_BODY,
ERROR
}
private static readonly Logger log = Logger.get(typeof(InputHandler));
private readonly Object m_objectLock = new object();
// the event raised when a buffer is read from the wire
public event EventHandler<ReceivedPayload<NetworkEvent>> ReceivedEvent;
public event EventHandler<ExceptionArgs> ExceptionProcessing;
public event EventHandler HandlerClosed;
event EventHandler<ReceivedPayload<NetworkEvent>> Receiver<ReceivedPayload<NetworkEvent>>.Received
{
add
{
lock (m_objectLock)
{
ReceivedEvent += value;
}
}
remove
{
lock (m_objectLock)
{
ReceivedEvent -= value;
}
}
}
event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<NetworkEvent>>.Exception
{
add
{
lock (m_objectLock)
{
ExceptionProcessing += value;
}
}
remove
{
lock (m_objectLock)
{
ExceptionProcessing -= value;
}
}
}
event EventHandler Receiver<ReceivedPayload<NetworkEvent>>.Closed
{
add
{
lock (m_objectLock)
{
HandlerClosed += value;
}
}
remove
{
lock (m_objectLock)
{
HandlerClosed -= value;
}
}
}
private State state;
private MemoryStream input;
private int needed;
private byte flags;
private SegmentType type;
private byte track;
private int channel;
public InputHandler(State state)
{
this.state = state;
switch (state)
{
case State.PROTO_HDR:
needed = 8;
break;
case State.FRAME_HDR:
needed = Frame.HEADER_SIZE;
break;
}
}
// The command listening for a buffer read.
public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload)
{
MemoryStream buf = payload.Payload;
int remaining = (int) buf.Length;
if( input != null )
{
remaining += (int) input.Length;
}
try
{
while (remaining > 0)
{
if (remaining >= needed)
{
if (input != null)
{
byte[] tmp = new byte[buf.Length];
buf.Read(tmp, 0, tmp.Length);
input.Write(tmp, 0, tmp.Length);
input.Seek(0, SeekOrigin.Begin);
buf = input;
}
int startPos = (int)buf.Position;
int consumed = needed;
state = next(buf);
if ((buf.Position - startPos) < consumed)
{
buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current);
}
remaining -= consumed;
input = null;
}
else
{
byte[] tmp;
if (input == null)
{
input = new MemoryStream();
tmp = new byte[remaining];
}
else
{
// this is a full buffer
tmp = new byte[buf.Length];
}
buf.Read(tmp, 0, tmp.Length);
input.Write(tmp, 0, tmp.Length);
remaining = 0;
}
}
}
catch (Exception t)
{
Console.Write(t);
if (ExceptionProcessing != null)
{
ExceptionProcessing(this, new ExceptionArgs(t));
}
}
}
#region Private Support Functions
private State next(MemoryStream buf)
{
BinaryReader reader = new BinaryReader(buf);
switch (state)
{
case State.PROTO_HDR:
char a = reader.ReadChar();
char m = reader.ReadChar();
char q = reader.ReadChar();
char p = reader.ReadChar();
if (a != 'A' &&
m != 'M' &&
q != 'Q' &&
p != 'P')
{
Error("bad protocol header: {0}", buf.ToString());
return State.ERROR;
}
reader.ReadByte();
byte instance = reader.ReadByte();
byte major = reader.ReadByte();
byte minor = reader.ReadByte();
Fire_NetworkEvent(new ProtocolHeader(instance, major, minor));
needed = Frame.HEADER_SIZE;
return State.FRAME_HDR;
case State.FRAME_HDR:
reader = new BinaryReader(buf, Encoding.BigEndianUnicode);
flags = reader.ReadByte();
type = SegmentTypeGetter.get(reader.ReadByte()); // generated code
int size = reader.ReadUInt16();
size = ByteEncoder.GetBigEndian((UInt16)size);
size -= Frame.HEADER_SIZE;
if (size < 0 || size > (64 * 1024 - 12))
{
Error("bad frame size: {0:d}", size);
return State.ERROR;
}
reader.ReadByte();
byte b = reader.ReadByte();
if ((b & 0xF0) != 0)
{
Error("non-zero reserved bits in upper nibble of " +
"frame header byte 5: {0}", b);
return State.ERROR;
}
track = (byte)(b & 0xF);
channel = reader.ReadUInt16();
channel = ByteEncoder.GetBigEndian((UInt16)channel);
if (size == 0)
{
Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream()));
needed = Frame.HEADER_SIZE;
return State.FRAME_HDR;
}
needed = size;
return State.FRAME_BODY;
case State.FRAME_BODY:
Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf));
needed = Frame.HEADER_SIZE;
return State.FRAME_HDR;
default:
if (ExceptionProcessing != null)
{
ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame")));
}
throw new Exception("Error creating frame");
}
}
private void Error(String fmt, params Object[] args)
{
Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args));
}
private void Fire_NetworkEvent(NetworkEvent netevent)
{
log.debug("InputHandler: network event:", netevent);
ReceivedPayload<NetworkEvent> payload = new ReceivedPayload<NetworkEvent>();
payload.Payload = netevent;
if (ReceivedEvent != null)
{
ReceivedEvent(this, payload);
}
else
{
log.debug("Nobody listening for event: {0}");
}
}
#endregion
}
}