| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.IO; |
| using System.Reflection; |
| using Apache.NMS.ActiveMQ.Commands; |
| using Apache.NMS.ActiveMQ.Transport; |
| using Apache.NMS.ActiveMQ.Transport.Tcp; |
| using Apache.NMS.Util; |
| |
| namespace Apache.NMS.ActiveMQ.OpenWire |
| { |
| /// <summary> |
| /// Implements the <a href="http://activemq.apache.org/openwire.html">OpenWire</a> protocol. |
| /// </summary> |
| public class OpenWireFormat : IWireFormat |
| { |
| private readonly BaseDataStreamMarshaller[] dataMarshallers; |
| private const byte NULL_TYPE = 0; |
| |
| private int version; |
| private bool cacheEnabled = false; |
| private bool stackTraceEnabled = false; |
| private bool tcpNoDelayEnabled = false; |
| private bool sizePrefixDisabled = false; |
| private bool tightEncodingEnabled = false; |
| private long maxInactivityDuration = 0; |
| private long maxInactivityDurationInitialDelay = 0; |
| private int cacheSize = 0; |
| private const int minimumVersion = 1; |
| |
| private WireFormatInfo preferredWireFormatInfo = new WireFormatInfo(); |
| private ITransport transport; |
| |
| public OpenWireFormat() |
| { |
| // See the following link for defaults: http://activemq.apache.org/configuring-wire-formats.html |
| // See also the following link for OpenWire format info: http://activemq.apache.org/openwire-version-2-specification.html |
| PreferredWireFormatInfo.CacheEnabled = false; |
| PreferredWireFormatInfo.StackTraceEnabled = false; |
| PreferredWireFormatInfo.TcpNoDelayEnabled = true; |
| PreferredWireFormatInfo.SizePrefixDisabled = false; |
| PreferredWireFormatInfo.TightEncodingEnabled = false; |
| PreferredWireFormatInfo.MaxInactivityDuration = 30000; |
| PreferredWireFormatInfo.MaxInactivityDurationInitialDelay = 10000; |
| PreferredWireFormatInfo.CacheSize = 0; |
| PreferredWireFormatInfo.Version = 10; |
| |
| dataMarshallers = new BaseDataStreamMarshaller[256]; |
| Version = 1; |
| } |
| |
| public ITransport Transport |
| { |
| get { return transport; } |
| set { transport = value; } |
| } |
| |
| public int Version |
| { |
| get { return version; } |
| set |
| { |
| Assembly dll = Assembly.GetExecutingAssembly(); |
| Type type = dll.GetType("Apache.NMS.ActiveMQ.OpenWire.V" + value + ".MarshallerFactory", false); |
| IMarshallerFactory factory = (IMarshallerFactory) Activator.CreateInstance(type); |
| factory.configure(this); |
| version = value; |
| } |
| } |
| |
| public bool CacheEnabled |
| { |
| get { return cacheEnabled; } |
| set { cacheEnabled = value; } |
| } |
| |
| public bool StackTraceEnabled |
| { |
| get { return stackTraceEnabled; } |
| set { stackTraceEnabled = value; } |
| } |
| |
| public bool TcpNoDelayEnabled |
| { |
| get { return tcpNoDelayEnabled; } |
| set { tcpNoDelayEnabled = value; } |
| } |
| |
| public bool SizePrefixDisabled |
| { |
| get { return sizePrefixDisabled; } |
| set { sizePrefixDisabled = value; } |
| } |
| |
| public bool TightEncodingEnabled |
| { |
| get { return tightEncodingEnabled; } |
| set { tightEncodingEnabled = value; } |
| } |
| |
| public long MaxInactivityDuration |
| { |
| get { return maxInactivityDuration; } |
| set { maxInactivityDuration = value; } |
| } |
| |
| public long MaxInactivityDurationInitialDelay |
| { |
| get { return maxInactivityDurationInitialDelay; } |
| set { maxInactivityDurationInitialDelay = value; } |
| } |
| |
| public int CacheSize |
| { |
| get { return cacheSize; } |
| set { cacheSize = value; } |
| } |
| |
| public WireFormatInfo PreferredWireFormatInfo |
| { |
| get { return preferredWireFormatInfo; } |
| set { preferredWireFormatInfo = value; } |
| } |
| |
| public void clearMarshallers() |
| { |
| for(int i = 0; i < dataMarshallers.Length; i++) |
| { |
| dataMarshallers[i] = null; |
| } |
| } |
| |
| public void addMarshaller(BaseDataStreamMarshaller marshaller) |
| { |
| byte type = marshaller.GetDataStructureType(); |
| dataMarshallers[type & 0xFF] = marshaller; |
| } |
| |
| private BaseDataStreamMarshaller GetDataStreamMarshallerForType(byte dataType) |
| { |
| BaseDataStreamMarshaller dsm = this.dataMarshallers[dataType & 0xFF]; |
| if(null == dsm) |
| { |
| throw new IOException("Unknown data type: " + dataType); |
| } |
| return dsm; |
| } |
| |
| public void Marshal(Object o, BinaryWriter ds) |
| { |
| int size = 1; |
| if(o != null) |
| { |
| DataStructure c = (DataStructure) o; |
| byte type = c.GetDataStructureType(); |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(type); |
| |
| if(tightEncodingEnabled) |
| { |
| BooleanStream bs = new BooleanStream(); |
| size += dsm.TightMarshal1(this, c, bs); |
| size += bs.MarshalledSize(); |
| |
| if(!sizePrefixDisabled) |
| { |
| ds.Write(size); |
| } |
| |
| ds.Write(type); |
| bs.Marshal(ds); |
| dsm.TightMarshal2(this, c, ds, bs); |
| } |
| else |
| { |
| BinaryWriter looseOut = ds; |
| MemoryStream ms = null; |
| |
| // If we are prefixing then we need to first write it to memory, |
| // otherwise we can write direct to the stream. |
| if(!sizePrefixDisabled) |
| { |
| ms = new MemoryStream(); |
| looseOut = new EndianBinaryWriter(ms); |
| looseOut.Write(size); |
| } |
| |
| looseOut.Write(type); |
| dsm.LooseMarshal(this, c, looseOut); |
| |
| if(!sizePrefixDisabled) |
| { |
| ms.Position = 0; |
| looseOut.Write((int) ms.Length - 4); |
| ds.Write(ms.GetBuffer(), 0, (int) ms.Length); |
| } |
| } |
| } |
| else |
| { |
| ds.Write(size); |
| ds.Write(NULL_TYPE); |
| } |
| |
| ds.Flush(); |
| } |
| |
| public Object Unmarshal(BinaryReader dis) |
| { |
| // lets ignore the size of the packet |
| if(!sizePrefixDisabled) |
| { |
| dis.ReadInt32(); |
| } |
| |
| // first byte is the type of the packet |
| byte dataType = dis.ReadByte(); |
| |
| if(dataType != NULL_TYPE) |
| { |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(dataType); |
| |
| Object data = dsm.CreateObject(); |
| |
| if(tightEncodingEnabled) |
| { |
| BooleanStream bs = new BooleanStream(); |
| bs.Unmarshal(dis); |
| dsm.TightUnmarshal(this, data, dis, bs); |
| return data; |
| } |
| else |
| { |
| dsm.LooseUnmarshal(this, data, dis); |
| return data; |
| } |
| } |
| |
| return null; |
| } |
| |
| public int TightMarshalNestedObject1(DataStructure o, BooleanStream bs) |
| { |
| bs.WriteBoolean(o != null); |
| if(null == o) |
| { |
| return 0; |
| } |
| |
| if(o.IsMarshallAware()) |
| { |
| MarshallAware ma = (MarshallAware) o; |
| byte[] sequence = ma.GetMarshalledForm(this); |
| bs.WriteBoolean(sequence != null); |
| if(sequence != null) |
| { |
| return 1 + sequence.Length; |
| } |
| } |
| |
| byte type = o.GetDataStructureType(); |
| if(type == 0) |
| { |
| throw new IOException("No valid data structure type for: " + o + " of type: " + o.GetType()); |
| } |
| |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(type); |
| |
| Tracer.Debug("Marshalling type: " + type + " with structure: " + o); |
| return 1 + dsm.TightMarshal1(this, o, bs); |
| } |
| |
| public void TightMarshalNestedObject2(DataStructure o, BinaryWriter ds, BooleanStream bs) |
| { |
| if(!bs.ReadBoolean()) |
| { |
| return; |
| } |
| |
| byte type = o.GetDataStructureType(); |
| ds.Write(type); |
| |
| if(o.IsMarshallAware() && bs.ReadBoolean()) |
| { |
| MarshallAware ma = (MarshallAware) o; |
| byte[] sequence = ma.GetMarshalledForm(this); |
| ds.Write(sequence, 0, sequence.Length); |
| } |
| else |
| { |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(type); |
| dsm.TightMarshal2(this, o, ds, bs); |
| } |
| } |
| |
| public DataStructure TightUnmarshalNestedObject(BinaryReader dis, BooleanStream bs) |
| { |
| if(bs.ReadBoolean()) |
| { |
| byte dataType = dis.ReadByte(); |
| |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(dataType); |
| DataStructure data = dsm.CreateObject(); |
| |
| if(data.IsMarshallAware() && bs.ReadBoolean()) |
| { |
| dis.ReadInt32(); |
| dis.ReadByte(); |
| |
| BooleanStream bs2 = new BooleanStream(); |
| bs2.Unmarshal(dis); |
| dsm.TightUnmarshal(this, data, dis, bs2); |
| } |
| else |
| { |
| dsm.TightUnmarshal(this, data, dis, bs); |
| } |
| |
| return data; |
| } |
| |
| return null; |
| } |
| |
| public void LooseMarshalNestedObject(DataStructure o, BinaryWriter dataOut) |
| { |
| dataOut.Write(o != null); |
| if(o != null) |
| { |
| byte type = o.GetDataStructureType(); |
| dataOut.Write(type); |
| |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(type); |
| dsm.LooseMarshal(this, o, dataOut); |
| } |
| } |
| |
| public DataStructure LooseUnmarshalNestedObject(BinaryReader dis) |
| { |
| if(dis.ReadBoolean()) |
| { |
| byte dataType = dis.ReadByte(); |
| |
| BaseDataStreamMarshaller dsm = GetDataStreamMarshallerForType(dataType); |
| DataStructure data = dsm.CreateObject(); |
| dsm.LooseUnmarshal(this, data, dis); |
| return data; |
| } |
| |
| return null; |
| } |
| |
| public void RenegotiateWireFormat(WireFormatInfo info) |
| { |
| if(info.Version < minimumVersion) |
| { |
| throw new IOException("Remote wire format (" + info.Version + ") is lower than the minimum version required (" + minimumVersion + ")"); |
| } |
| |
| this.Version = Math.Min(PreferredWireFormatInfo.Version, info.Version); |
| this.cacheEnabled = info.CacheEnabled && PreferredWireFormatInfo.CacheEnabled; |
| this.stackTraceEnabled = info.StackTraceEnabled && PreferredWireFormatInfo.StackTraceEnabled; |
| this.tcpNoDelayEnabled = info.TcpNoDelayEnabled && PreferredWireFormatInfo.TcpNoDelayEnabled; |
| this.sizePrefixDisabled = info.SizePrefixDisabled && PreferredWireFormatInfo.SizePrefixDisabled; |
| this.tightEncodingEnabled = info.TightEncodingEnabled && PreferredWireFormatInfo.TightEncodingEnabled; |
| this.maxInactivityDuration = info.MaxInactivityDuration; |
| this.maxInactivityDurationInitialDelay = info.MaxInactivityDurationInitialDelay; |
| this.cacheSize = info.CacheSize; |
| |
| TcpTransport tcpTransport = this.transport as TcpTransport; |
| if(null != tcpTransport) |
| { |
| tcpTransport.TcpNoDelayEnabled = this.tcpNoDelayEnabled; |
| } |
| } |
| } |
| } |