blob: ff85f11c2f4848f22a8ed25b65c40fff442ab206 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
using System;
using System.Collections.Generic;
using System.IO;
using org.apache.qpid.transport.codec;
using org.apache.qpid.transport.util;
namespace org.apache.qpid.transport.network
{
/// <summary>
/// Assembler
/// </summary>
public delegate void Processor(INetworkDelegate ndelegate);
public class Assembler : INetworkDelegate, IReceiver<ReceivedPayload<IProtocolEvent>>
{
private static readonly Logger log = Logger.Get(typeof (Assembler));
private readonly Dictionary<int, List<byte[]>> segments;
private readonly Method[] incomplete;
[ThreadStatic] static MSDecoder _decoder;
private readonly Object m_objectLock = new object();
// the event raised when a buffer is read from the wire
public event EventHandler<ReceivedPayload<IProtocolEvent>> ReceivedEvent;
public event EventHandler Closed;
// Not in use :
public event EventHandler<ExceptionArgs> Exception;
event EventHandler<ReceivedPayload<IProtocolEvent>> IReceiver<ReceivedPayload<IProtocolEvent>>.Received
{
add
{
lock (m_objectLock)
{
ReceivedEvent += value;
}
}
remove
{
lock (m_objectLock)
{
ReceivedEvent -= value;
}
}
}
public Assembler()
{
segments = new Dictionary<int, List<byte[]>>();
incomplete = new Method[64*1024];
}
// Invoked when a network event is received
public void On_ReceivedEvent(object sender, ReceivedPayload<INetworkEvent> payload)
{
payload.Payload.ProcessNetworkEvent(this);
}
#region Interface INetworkDelegate
public void Init(ProtocolHeader header)
{
Emit(0, header);
}
public void Error(ProtocolError error)
{
Emit(0, error);
}
public void Frame(Frame frame)
{
MemoryStream segment;
if (frame.IsFirstFrame() && frame.IsLastFrame())
{
byte[] tmp = new byte[frame.BodySize];
frame.Body.Read(tmp, 0, tmp.Length);
segment = new MemoryStream();
BinaryWriter w = new BinaryWriter(segment);
w.Write(tmp);
Assemble(frame, new MemoryStream(tmp));
}
else
{
List<byte[]> frames;
if (frame.IsFirstFrame())
{
frames = new List<byte[]>();
SetSegment(frame, frames);
}
else
{
frames = GetSegment(frame);
}
byte[] tmp = new byte[frame.BodySize];
frame.Body.Read(tmp, 0, tmp.Length);
frames.Add(tmp);
if (frame.IsLastFrame())
{
ClearSegment(frame);
segment = new MemoryStream();
BinaryWriter w = new BinaryWriter(segment);
foreach (byte[] f in frames)
{
w.Write(f);
}
Assemble(frame, segment);
}
}
}
#endregion
#region Private Support Functions
private MSDecoder GetDecoder()
{
if( _decoder == null )
{
_decoder = new MSDecoder();
}
return _decoder;
}
private void Assemble(Frame frame, MemoryStream segment)
{
MSDecoder decoder = GetDecoder();
decoder.Init(segment);
int channel = frame.Channel;
Method command;
switch (frame.Type)
{
case SegmentType.CONTROL:
int controlType = decoder.ReadUint16();
Method control = Method.Create(controlType);
control.Read(decoder);
Emit(channel, control);
break;
case SegmentType.COMMAND:
int commandType = decoder.ReadUint16();
// read in the session header, right now we don't use it
decoder.ReadUint16();
command = Method.Create(commandType);
command.Read(decoder);
if (command.HasPayload())
{
incomplete[channel] = command;
}
else
{
Emit(channel, command);
}
break;
case SegmentType.HEADER:
command = incomplete[channel];
List<Struct> structs = new List<Struct>();
while (decoder.HasRemaining())
{
structs.Add(decoder.ReadStruct32());
}
command.Header = new Header(structs);
if (frame.IsLastSegment())
{
incomplete[channel] = null;
Emit(channel, command);
}
break;
case SegmentType.BODY:
command = incomplete[channel];
segment.Seek(0, SeekOrigin.Begin);
command.Body = segment;
incomplete[channel] = null;
Emit(channel, command);
break;
default:
throw new Exception("unknown frame type: " + frame.Type);
}
}
private int SegmentKey(Frame frame)
{
return (frame.Track + 1)*frame.Channel;
}
private List<byte[]> GetSegment(Frame frame)
{
return segments[SegmentKey(frame)];
}
private void SetSegment(Frame frame, List<byte[]> segment)
{
int key = SegmentKey(frame);
if (segments.ContainsKey(key))
{
Error(new ProtocolError(network.Frame.L2, "segment in progress: %s",
frame));
}
segments.Add(SegmentKey(frame), segment);
}
private void ClearSegment(Frame frame)
{
segments.Remove(SegmentKey(frame));
}
// Emit a protocol event
private void Emit(int channel, IProtocolEvent protevent)
{
protevent.Channel = channel;
log.Debug("Assembler: protocol event:", protevent);
ReceivedPayload<IProtocolEvent> payload = new ReceivedPayload<IProtocolEvent>();
payload.Payload = protevent;
if (protevent is ConnectionCloseOk)
{
if (Closed != null)
Closed(this, EventArgs.Empty);
}
else
{
if (ReceivedEvent != null)
ReceivedEvent(this, payload);
else
log.Debug("No listener for event: {0}", protevent);
}
}
#endregion
}
}