blob: 4de0e7495ecb02974cd29dfe8751c15fc32d43cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.IO;
using Apache.NMS.Util;
namespace Apache.NMS.ActiveMQ.Commands
{
public class ActiveMQStreamMessage : ActiveMQMessage, IStreamMessage
{
private EndianBinaryReader dataIn = null;
private EndianBinaryWriter dataOut = null;
private MemoryStream byteBuffer = null;
private int bytesRemaining = -1;
public const byte ID_ACTIVEMQSTREAMMESSAGE = 27;
public override byte GetDataStructureType()
{
return ID_ACTIVEMQSTREAMMESSAGE;
}
public bool ReadBoolean()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.BOOLEAN_TYPE:
return this.dataIn.ReadBoolean();
case PrimitiveMap.STRING_TYPE:
return Boolean.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a bool");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Boolean type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public byte ReadByte()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte();
case PrimitiveMap.STRING_TYPE:
return Byte.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a byte");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Byte type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public char ReadChar()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.CHAR_TYPE:
return this.dataIn.ReadChar();
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a char");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Char type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public short ReadInt16()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.SHORT_TYPE:
return this.dataIn.ReadInt16();
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte();
case PrimitiveMap.STRING_TYPE:
return Int16.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a short");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Int16 type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public int ReadInt32()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.INTEGER_TYPE:
return this.dataIn.ReadInt32();
case PrimitiveMap.SHORT_TYPE:
return this.dataIn.ReadInt16();
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte();
case PrimitiveMap.STRING_TYPE:
return Int32.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a int");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Int32 type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public long ReadInt64()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.LONG_TYPE:
return this.dataIn.ReadInt64();
case PrimitiveMap.INTEGER_TYPE:
return this.dataIn.ReadInt32();
case PrimitiveMap.SHORT_TYPE:
return this.dataIn.ReadInt16();
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte();
case PrimitiveMap.STRING_TYPE:
return Int64.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a long");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Int64 type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public float ReadSingle()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.FLOAT_TYPE:
return this.dataIn.ReadSingle();
case PrimitiveMap.STRING_TYPE:
return Single.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a float");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Single type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public double ReadDouble()
{
InitializeReading();
try
{
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.DOUBLE_TYPE:
return this.dataIn.ReadDouble();
case PrimitiveMap.FLOAT_TYPE:
return this.dataIn.ReadSingle();
case PrimitiveMap.STRING_TYPE:
return Single.Parse(this.dataIn.ReadString16());
case PrimitiveMap.NULL:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new NMSException("Cannot convert Null type to a double");
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a Double type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public string ReadString()
{
InitializeReading();
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.BIG_STRING_TYPE:
return this.dataIn.ReadString32();
case PrimitiveMap.STRING_TYPE:
return this.dataIn.ReadString16();
case PrimitiveMap.LONG_TYPE:
return this.dataIn.ReadInt64().ToString();
case PrimitiveMap.INTEGER_TYPE:
return this.dataIn.ReadInt32().ToString();
case PrimitiveMap.SHORT_TYPE:
return this.dataIn.ReadInt16().ToString();
case PrimitiveMap.FLOAT_TYPE:
return this.dataIn.ReadSingle().ToString();
case PrimitiveMap.DOUBLE_TYPE:
return this.dataIn.ReadDouble().ToString();
case PrimitiveMap.CHAR_TYPE:
return this.dataIn.ReadChar().ToString();
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte().ToString();
case PrimitiveMap.BOOLEAN_TYPE:
return this.dataIn.ReadBoolean().ToString();
case PrimitiveMap.NULL:
return null;
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a known type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public int ReadBytes(byte[] value)
{
InitializeReading();
if(value == null)
{
throw new NullReferenceException("Passed Byte Array is null");
}
try
{
if(this.bytesRemaining == -1)
{
long startingPos = this.byteBuffer.Position;
byte type = this.dataIn.ReadByte();
if(type != PrimitiveMap.BYTE_ARRAY_TYPE)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Not a byte array");
}
this.bytesRemaining = this.dataIn.ReadInt32();
}
else if(this.bytesRemaining == 0)
{
this.bytesRemaining = -1;
return -1;
}
if(value.Length <= this.bytesRemaining)
{
// small buffer
this.bytesRemaining -= value.Length;
this.dataIn.Read(value, 0, value.Length);
return value.Length;
}
else
{
// big buffer
int rc = this.dataIn.Read(value, 0, this.bytesRemaining);
this.bytesRemaining = 0;
return rc;
}
}
catch(EndOfStreamException ex)
{
throw NMSExceptionSupport.CreateMessageEOFException(ex);
}
catch(IOException ex)
{
throw NMSExceptionSupport.CreateMessageFormatException(ex);
}
}
public Object ReadObject()
{
InitializeReading();
long startingPos = this.byteBuffer.Position;
try
{
int type = this.dataIn.ReadByte();
switch (type)
{
case PrimitiveMap.BIG_STRING_TYPE:
return this.dataIn.ReadString32();
case PrimitiveMap.STRING_TYPE:
return this.dataIn.ReadString16();
case PrimitiveMap.LONG_TYPE:
return this.dataIn.ReadInt64();
case PrimitiveMap.INTEGER_TYPE:
return this.dataIn.ReadInt32();
case PrimitiveMap.SHORT_TYPE:
return this.dataIn.ReadInt16();
case PrimitiveMap.FLOAT_TYPE:
return this.dataIn.ReadSingle();
case PrimitiveMap.DOUBLE_TYPE:
return this.dataIn.ReadDouble();
case PrimitiveMap.CHAR_TYPE:
return this.dataIn.ReadChar();
case PrimitiveMap.BYTE_TYPE:
return this.dataIn.ReadByte();
case PrimitiveMap.BOOLEAN_TYPE:
return this.dataIn.ReadBoolean();
case PrimitiveMap.BYTE_ARRAY_TYPE:
{
int length = this.dataIn.ReadInt32();
byte[] data = new byte[length];
this.dataIn.Read(data, 0, length);
return data;
}
case PrimitiveMap.NULL:
return null;
default:
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw new MessageFormatException("Value is not a known type.");
}
}
catch(FormatException e)
{
this.byteBuffer.Seek(startingPos, SeekOrigin.Begin);
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
catch(EndOfStreamException e)
{
throw NMSExceptionSupport.CreateMessageEOFException(e);
}
catch(IOException e)
{
throw NMSExceptionSupport.CreateMessageFormatException(e);
}
}
public void WriteBoolean(bool value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.BOOLEAN_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteByte(byte value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.BYTE_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteBytes(byte[] value)
{
InitializeWriting();
this.WriteBytes(value, 0, value.Length);
}
public void WriteBytes(byte[] value, int offset, int length)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.BYTE_ARRAY_TYPE);
this.dataOut.Write((int) length);
this.dataOut.Write(value, offset, length);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteChar(char value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.CHAR_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteInt16(short value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.SHORT_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteInt32(int value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.INTEGER_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteInt64(long value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.LONG_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteSingle(float value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.FLOAT_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteDouble(double value)
{
InitializeWriting();
try
{
this.dataOut.Write(PrimitiveMap.DOUBLE_TYPE);
this.dataOut.Write(value);
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteString(string value)
{
InitializeWriting();
try
{
if( value.Length > 8192 )
{
this.dataOut.Write(PrimitiveMap.BIG_STRING_TYPE);
this.dataOut.WriteString32(value);
}
else
{
this.dataOut.Write(PrimitiveMap.STRING_TYPE);
this.dataOut.WriteString16(value);
}
}
catch(IOException e)
{
NMSExceptionSupport.Create(e);
}
}
public void WriteObject(Object value)
{
InitializeWriting();
if( value is System.Byte )
{
this.WriteByte( (byte) value );
}
else if( value is Char )
{
this.WriteChar( (char) value );
}
else if( value is Boolean )
{
this.WriteBoolean( (bool) value );
}
else if( value is Int16 )
{
this.WriteInt16( (short) value );
}
else if( value is Int32 )
{
this.WriteInt32( (int) value );
}
else if( value is Int64 )
{
this.WriteInt64( (long) value );
}
else if( value is Single )
{
this.WriteSingle( (float) value );
}
else if( value is Double )
{
this.WriteDouble( (double) value );
}
else if( value is byte[] )
{
this.WriteBytes( (byte[]) value );
}
else if( value is String )
{
this.WriteString( (string) value );
}
else
{
throw new MessageFormatException("Cannot write non-primitive type:" + value.GetType());
}
}
public override Object Clone()
{
StoreContent();
return base.Clone();
}
public override void OnSend()
{
base.OnSend();
StoreContent();
}
public override void ClearBody()
{
base.ClearBody();
this.byteBuffer = null;
this.dataIn = null;
this.dataOut = null;
this.bytesRemaining = -1;
}
public void Reset()
{
StoreContent();
this.dataIn = null;
this.dataOut = null;
this.byteBuffer = null;
this.bytesRemaining = -1;
this.ReadOnlyBody = true;
}
private void InitializeReading()
{
FailIfWriteOnlyBody();
if(this.dataIn == null)
{
this.byteBuffer = new MemoryStream(this.Content, false);
Stream target = this.byteBuffer;
if(this.Connection != null && this.Compressed == true)
{
target = this.Connection.CompressionPolicy.CreateDecompressionStream(target);
}
this.dataIn = new EndianBinaryReader(target);
}
}
private void InitializeWriting()
{
FailIfReadOnlyBody();
if(this.dataOut == null)
{
this.byteBuffer = new MemoryStream();
Stream target = this.byteBuffer;
if(this.Connection != null && this.Connection.UseCompression)
{
target = this.Connection.CompressionPolicy.CreateCompressionStream(target);
this.Compressed = true;
}
this.dataOut = new EndianBinaryWriter(target);
}
}
private void StoreContent()
{
if( dataOut != null)
{
dataOut.Close();
this.Content = byteBuffer.ToArray();
this.dataOut = null;
this.byteBuffer = null;
}
}
}
}