/* | |
* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, | |
* software distributed under the License is distributed on an | |
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | |
* KIND, either express or implied. See the License for the | |
* specific language governing permissions and limitations | |
* under the License. | |
* | |
*/ | |
using System; | |
using System.IO; | |
using System.Text; | |
using org.apache.qpid.transport.util; | |
namespace org.apache.qpid.transport.network | |
{ | |
/// <summary> | |
/// InputHandler | |
/// </summary> | |
public sealed class InputHandler : Receiver<ReceivedPayload<NetworkEvent>> | |
{ | |
public enum State | |
{ | |
PROTO_HDR, | |
FRAME_HDR, | |
FRAME_BODY, | |
ERROR | |
} | |
private static readonly Logger log = Logger.get(typeof(InputHandler)); | |
private readonly Object m_objectLock = new object(); | |
// the event raised when a buffer is read from the wire | |
public event EventHandler<ReceivedPayload<NetworkEvent>> ReceivedEvent; | |
public event EventHandler<ExceptionArgs> ExceptionProcessing; | |
public event EventHandler HandlerClosed; | |
event EventHandler<ReceivedPayload<NetworkEvent>> Receiver<ReceivedPayload<NetworkEvent>>.Received | |
{ | |
add | |
{ | |
lock (m_objectLock) | |
{ | |
ReceivedEvent += value; | |
} | |
} | |
remove | |
{ | |
lock (m_objectLock) | |
{ | |
ReceivedEvent -= value; | |
} | |
} | |
} | |
event EventHandler<ExceptionArgs> Receiver<ReceivedPayload<NetworkEvent>>.Exception | |
{ | |
add | |
{ | |
lock (m_objectLock) | |
{ | |
ExceptionProcessing += value; | |
} | |
} | |
remove | |
{ | |
lock (m_objectLock) | |
{ | |
ExceptionProcessing -= value; | |
} | |
} | |
} | |
event EventHandler Receiver<ReceivedPayload<NetworkEvent>>.Closed | |
{ | |
add | |
{ | |
lock (m_objectLock) | |
{ | |
HandlerClosed += value; | |
} | |
} | |
remove | |
{ | |
lock (m_objectLock) | |
{ | |
HandlerClosed -= value; | |
} | |
} | |
} | |
private State state; | |
private MemoryStream input; | |
private int needed; | |
private byte flags; | |
private SegmentType type; | |
private byte track; | |
private int channel; | |
public InputHandler(State state) | |
{ | |
this.state = state; | |
switch (state) | |
{ | |
case State.PROTO_HDR: | |
needed = 8; | |
break; | |
case State.FRAME_HDR: | |
needed = Frame.HEADER_SIZE; | |
break; | |
} | |
} | |
// The command listening for a buffer read. | |
public void On_ReceivedBuffer(object sender, ReceivedPayload<MemoryStream> payload) | |
{ | |
MemoryStream buf = payload.Payload; | |
int remaining = (int) buf.Length; | |
if( input != null ) | |
{ | |
remaining += (int) input.Length; | |
} | |
try | |
{ | |
while (remaining > 0) | |
{ | |
if (remaining >= needed) | |
{ | |
if (input != null) | |
{ | |
byte[] tmp = new byte[buf.Length]; | |
buf.Read(tmp, 0, tmp.Length); | |
input.Write(tmp, 0, tmp.Length); | |
input.Seek(0, SeekOrigin.Begin); | |
buf = input; | |
} | |
int startPos = (int)buf.Position; | |
int consumed = needed; | |
state = next(buf); | |
if ((buf.Position - startPos) < consumed) | |
{ | |
buf.Seek(consumed - (buf.Position - startPos), SeekOrigin.Current); | |
} | |
remaining -= consumed; | |
input = null; | |
} | |
else | |
{ | |
byte[] tmp; | |
if (input == null) | |
{ | |
input = new MemoryStream(); | |
tmp = new byte[remaining]; | |
} | |
else | |
{ | |
// this is a full buffer | |
tmp = new byte[buf.Length]; | |
} | |
buf.Read(tmp, 0, tmp.Length); | |
input.Write(tmp, 0, tmp.Length); | |
remaining = 0; | |
} | |
} | |
} | |
catch (Exception t) | |
{ | |
Console.Write(t); | |
if (ExceptionProcessing != null) | |
{ | |
ExceptionProcessing(this, new ExceptionArgs(t)); | |
} | |
} | |
} | |
#region Private Support Functions | |
private State next(MemoryStream buf) | |
{ | |
BinaryReader reader = new BinaryReader(buf); | |
switch (state) | |
{ | |
case State.PROTO_HDR: | |
char a = reader.ReadChar(); | |
char m = reader.ReadChar(); | |
char q = reader.ReadChar(); | |
char p = reader.ReadChar(); | |
if (a != 'A' && | |
m != 'M' && | |
q != 'Q' && | |
p != 'P') | |
{ | |
Error("bad protocol header: {0}", buf.ToString()); | |
return State.ERROR; | |
} | |
reader.ReadByte(); | |
byte instance = reader.ReadByte(); | |
byte major = reader.ReadByte(); | |
byte minor = reader.ReadByte(); | |
Fire_NetworkEvent(new ProtocolHeader(instance, major, minor)); | |
needed = Frame.HEADER_SIZE; | |
return State.FRAME_HDR; | |
case State.FRAME_HDR: | |
reader = new BinaryReader(buf, Encoding.BigEndianUnicode); | |
flags = reader.ReadByte(); | |
type = SegmentTypeGetter.get(reader.ReadByte()); // generated code | |
int size = reader.ReadUInt16(); | |
size = ByteEncoder.GetBigEndian((UInt16)size); | |
size -= Frame.HEADER_SIZE; | |
if (size < 0 || size > (64 * 1024 - 12)) | |
{ | |
Error("bad frame size: {0:d}", size); | |
return State.ERROR; | |
} | |
reader.ReadByte(); | |
byte b = reader.ReadByte(); | |
if ((b & 0xF0) != 0) | |
{ | |
Error("non-zero reserved bits in upper nibble of " + | |
"frame header byte 5: {0}", b); | |
return State.ERROR; | |
} | |
track = (byte)(b & 0xF); | |
channel = reader.ReadUInt16(); | |
channel = ByteEncoder.GetBigEndian((UInt16)channel); | |
if (size == 0) | |
{ | |
Fire_NetworkEvent(new Frame(flags, type, track, channel, 0, new MemoryStream())); | |
needed = Frame.HEADER_SIZE; | |
return State.FRAME_HDR; | |
} | |
needed = size; | |
return State.FRAME_BODY; | |
case State.FRAME_BODY: | |
Fire_NetworkEvent(new Frame(flags, type, track, channel, needed, buf)); | |
needed = Frame.HEADER_SIZE; | |
return State.FRAME_HDR; | |
default: | |
if (ExceptionProcessing != null) | |
{ | |
ExceptionProcessing(this, new ExceptionArgs(new Exception("Error creating frame"))); | |
} | |
throw new Exception("Error creating frame"); | |
} | |
} | |
private void Error(String fmt, params Object[] args) | |
{ | |
Fire_NetworkEvent(new ProtocolError(Frame.L1, fmt, args)); | |
} | |
private void Fire_NetworkEvent(NetworkEvent netevent) | |
{ | |
log.debug("InputHandler: network event:", netevent); | |
ReceivedPayload<NetworkEvent> payload = new ReceivedPayload<NetworkEvent>(); | |
payload.Payload = netevent; | |
if (ReceivedEvent != null) | |
{ | |
ReceivedEvent(this, payload); | |
} | |
else | |
{ | |
log.debug("Nobody listening for event: {0}"); | |
} | |
} | |
#endregion | |
} | |
} |