| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| namespace Apache.Qpid.Channel |
| { |
| using System; |
| using System.IO; |
| using System.ServiceModel.Channels; |
| using System.Xml; |
| |
| // This incoming Message is backed either by a Stream (bodyStream) or a byte array (bodyBytes). |
| // If bodyBytes belongs to a BufferManager, we must return it when done. |
| // The pay-off is OnGetReaderAtBodyContents(). |
| // Most of the complexity is dealing with the OnCreateBufferedCopy() machinery. |
| internal class RawMessage : Message |
| { |
| private MessageHeaders headers; |
| private MessageProperties properties; |
| private XmlDictionaryReaderQuotas readerQuotas; |
| private Stream bodyStream; |
| private byte[] bodyBytes; |
| private int index; |
| private int count; |
| private BufferManager bufferManager; |
| |
| public RawMessage(byte[] buffer, int index, int count, BufferManager bufferManager, XmlDictionaryReaderQuotas quotas) |
| { |
| // this constructor supports MessageEncoder.ReadMessage(ArraySegment<byte> b, BufferManager mgr, string contentType) |
| if (quotas == null) |
| { |
| quotas = new XmlDictionaryReaderQuotas(); |
| } |
| |
| this.headers = new MessageHeaders(MessageVersion.None); |
| this.properties = new MessageProperties(); |
| this.readerQuotas = quotas; |
| this.bodyBytes = buffer; |
| this.index = index; |
| this.count = count; |
| this.bufferManager = bufferManager; |
| } |
| |
| public RawMessage(Stream stream, XmlDictionaryReaderQuotas quotas) |
| { |
| // this constructor supports MessageEncoder.ReadMessage(System.IO.Stream s, int max, string contentType) |
| if (quotas == null) |
| { |
| quotas = new XmlDictionaryReaderQuotas(); |
| } |
| |
| this.headers = new MessageHeaders(MessageVersion.None); |
| this.properties = new MessageProperties(); |
| this.bodyStream = stream; |
| } |
| |
| public RawMessage(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) |
| { |
| // this constructor supports internal needs for CreateBufferedCopy().CreateMessage() |
| this.headers = new MessageHeaders(headers); |
| this.properties = new MessageProperties(properties); |
| this.bodyBytes = bytes; |
| this.index = index; |
| this.count = count; |
| this.readerQuotas = quotas; |
| } |
| |
| public override MessageHeaders Headers |
| { |
| get |
| { |
| if (this.IsDisposed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return this.headers; |
| } |
| } |
| |
| public override bool IsEmpty |
| { |
| get |
| { |
| if (this.IsDisposed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return false; |
| } |
| } |
| |
| public override bool IsFault |
| { |
| get |
| { |
| if (this.IsDisposed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return false; |
| } |
| } |
| |
| public override MessageProperties Properties |
| { |
| get |
| { |
| if (this.IsDisposed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return this.properties; |
| } |
| } |
| |
| public override MessageVersion Version |
| { |
| get |
| { |
| if (this.IsDisposed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return MessageVersion.None; |
| } |
| } |
| |
| protected override void OnBodyToString(XmlDictionaryWriter writer) |
| { |
| if (this.bodyStream != null) |
| { |
| writer.WriteString("Stream"); |
| } |
| else |
| { |
| writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); |
| writer.WriteBase64(this.bodyBytes, this.index, this.count); |
| writer.WriteEndElement(); |
| } |
| } |
| |
| protected override void OnClose() |
| { |
| Exception deferEx = null; |
| try |
| { |
| base.OnClose(); |
| } |
| catch (Exception e) |
| { |
| deferEx = e; |
| } |
| |
| try |
| { |
| if (this.properties != null) |
| { |
| this.properties.Dispose(); |
| } |
| } |
| catch (Exception e) |
| { |
| if (deferEx == null) |
| { |
| deferEx = e; |
| } |
| } |
| |
| try |
| { |
| if (this.bufferManager != null) |
| { |
| this.bufferManager.ReturnBuffer(this.bodyBytes); |
| this.bufferManager = null; |
| } |
| } |
| catch (Exception e) |
| { |
| if (deferEx == null) |
| { |
| deferEx = e; |
| } |
| } |
| |
| if (deferEx != null) |
| { |
| throw deferEx; |
| } |
| } |
| |
| protected override MessageBuffer OnCreateBufferedCopy(int maxBufferSize) |
| { |
| if (this.bodyStream != null) |
| { |
| int len = (int)this.bodyStream.Length; |
| byte[] buf = new byte[len]; |
| this.bodyStream.Read(buf, 0, len); |
| this.bodyStream = null; |
| this.bodyBytes = buf; |
| this.count = len; |
| this.index = 0; |
| } |
| else |
| { |
| if (this.bufferManager != null) |
| { |
| // we could take steps to share the buffer among copies and release the memory |
| // after the last user finishes by a reference count or such, but we are already |
| // far from the intended optimized use. Make one GC managed memory copy that is |
| // shared by all. |
| byte[] buf = new byte[this.count]; |
| |
| Buffer.BlockCopy(this.bodyBytes, this.index, buf, 0, this.count); |
| this.bufferManager.ReturnBuffer(this.bodyBytes); |
| this.bufferManager = null; |
| this.bodyBytes = buf; |
| this.index = 0; |
| } |
| } |
| |
| return new RawMessageBuffer(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); |
| } |
| |
| protected override XmlDictionaryReader OnGetReaderAtBodyContents() |
| { |
| Stream readerStream = null; |
| bool ownsStream; |
| |
| if (this.bodyStream != null) |
| { |
| readerStream = this.bodyStream; |
| ownsStream = false; |
| } |
| else |
| { |
| // create stream for duration of XmlReader. |
| ownsStream = true; |
| if (this.bufferManager != null) |
| { |
| readerStream = new RawMemoryStream(this.bodyBytes, this.index, this.count, this.bufferManager); |
| this.bufferManager = null; |
| } |
| else |
| { |
| readerStream = new MemoryStream(this.bodyBytes, this.index, this.count, false); |
| } |
| } |
| |
| return new RawXmlReader(readerStream, this.readerQuotas, ownsStream); |
| } |
| |
| protected override void OnWriteBodyContents(XmlDictionaryWriter writer) |
| { |
| writer.WriteStartElement(RawMessageEncoder.StreamElementName, string.Empty); |
| if (this.bodyStream != null) |
| { |
| int len = (int)this.bodyStream.Length; |
| byte[] buf = new byte[len]; |
| this.bodyStream.Read(buf, 0, len); |
| writer.WriteBase64(buf, 0, len); |
| } |
| else |
| { |
| writer.WriteBase64(this.bodyBytes, this.index, this.count); |
| } |
| |
| writer.WriteEndElement(); |
| } |
| |
| private class RawMemoryStream : MemoryStream |
| { |
| private BufferManager bufferManager; |
| private byte[] buffer; |
| |
| public RawMemoryStream(byte[] bytes, int index, int count, BufferManager mgr) |
| : base(bytes, index, count, false) |
| { |
| this.bufferManager = mgr; |
| this.buffer = bytes; |
| } |
| |
| protected override void Dispose(bool disposing) |
| { |
| if (this.bufferManager != null) |
| { |
| try |
| { |
| this.bufferManager.ReturnBuffer(this.buffer); |
| } |
| finally |
| { |
| this.bufferManager = null; |
| base.Dispose(disposing); |
| } |
| } |
| } |
| } |
| |
| private class RawMessageBuffer : MessageBuffer |
| { |
| private bool closed; |
| private MessageHeaders headers; |
| private MessageProperties properties; |
| private byte[] bodyBytes; |
| private int index; |
| private int count; |
| private XmlDictionaryReaderQuotas readerQuotas; |
| |
| public RawMessageBuffer(MessageHeaders headers, MessageProperties properties, byte[] bytes, int index, int count, XmlDictionaryReaderQuotas quotas) |
| : base() |
| { |
| this.headers = new MessageHeaders(headers); |
| this.properties = new MessageProperties(properties); |
| this.bodyBytes = bytes; |
| this.index = index; |
| this.count = count; |
| this.readerQuotas = new XmlDictionaryReaderQuotas(); |
| quotas.CopyTo(this.readerQuotas); |
| } |
| |
| public override int BufferSize |
| { |
| get { return this.count; } |
| } |
| |
| public override void Close() |
| { |
| if (!this.closed) |
| { |
| this.closed = true; |
| this.headers = null; |
| if (this.properties != null) |
| { |
| this.properties.Dispose(); |
| this.properties = null; |
| } |
| |
| this.bodyBytes = null; |
| this.readerQuotas = null; |
| } |
| } |
| |
| public override Message CreateMessage() |
| { |
| if (this.closed) |
| { |
| throw new ObjectDisposedException("message"); |
| } |
| |
| return new RawMessage(this.headers, this.properties, this.bodyBytes, this.index, this.count, this.readerQuotas); |
| } |
| } |
| } |
| } |