Merge pull request #50 from HavretGC/refactoring-meta-classes
AMQNET-629: Rework Meta classes
diff --git a/apache-nms-amqp.sln.DotSettings b/apache-nms-amqp.sln.DotSettings
index 5c57e20..972a724 100644
--- a/apache-nms-amqp.sln.DotSettings
+++ b/apache-nms-amqp.sln.DotSettings
@@ -1,4 +1,9 @@
<wpf:ResourceDictionary xml:space="preserve" xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml" xmlns:s="clr-namespace:System;assembly=mscorlib" xmlns:ss="urn:shemas-jetbrains-com:settings-storage-xaml" xmlns:wpf="http://schemas.microsoft.com/winfx/2006/xaml/presentation">
+ <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=ASCII/@EntryIndexedValue">ASCII</s:String>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=JMS/@EntryIndexedValue">JMS</s:String>
+ <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=Constants/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="AA_BB" /></s:String>
+ <s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/PredefinedNamingRules/=PrivateInstanceFields/@EntryIndexedValue"><Policy Inspect="True" Prefix="" Suffix="" Style="aaBb" /></s:String>
+ <s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean>
+ <s:Boolean x:Key="/Default/UserDictionary/Words/=Failover/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
<s:String x:Key="/Default/CodeStyle/Naming/CSharpNaming/Abbreviations/=TX/@EntryIndexedValue">TX</s:String></wpf:ResourceDictionary>
- <s:Boolean x:Key="/Default/UserDictionary/Words/=Amqp/@EntryIndexedValue">True</s:Boolean></wpf:ResourceDictionary>
\ No newline at end of file
+
\ No newline at end of file
diff --git a/src/NMS.AMQP/Apache-NMS-AMQP.csproj b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
index fdf0f49..ab5a922 100644
--- a/src/NMS.AMQP/Apache-NMS-AMQP.csproj
+++ b/src/NMS.AMQP/Apache-NMS-AMQP.csproj
@@ -73,6 +73,12 @@
</PropertyGroup>
<ItemGroup>
+ <AssemblyAttribute Include="System.Runtime.CompilerServices.InternalsVisibleTo">
+ <_Parameter1>NMS.AMQP.Test</_Parameter1>
+ </AssemblyAttribute>
+ </ItemGroup>
+
+ <ItemGroup>
<!-- AMQPNetLite.Core is .NET Standard 1.3 package -->
<PackageReference Include="AMQPNetLite.Core" Version="2.3.0" />
<PackageReference Include="Apache.NMS" Version="1.8.0" />
diff --git a/src/NMS.AMQP/INmsTransactionContext.cs b/src/NMS.AMQP/INmsTransactionContext.cs
index 714b99a..6b8f6b1 100644
--- a/src/NMS.AMQP/INmsTransactionContext.cs
+++ b/src/NMS.AMQP/INmsTransactionContext.cs
@@ -17,6 +17,7 @@
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
@@ -89,7 +90,7 @@
/// Allows a resource to query the transaction context to determine if it has pending
/// work in the current transaction.
/// </summary>
- bool IsActiveInThisContext(Id infoId);
+ bool IsActiveInThisContext(INmsResourceId infoId);
event SessionTxEventDelegate TransactionStartedListener;
event SessionTxEventDelegate TransactionCommittedListener;
diff --git a/src/NMS.AMQP/Message/DefaultMessageIdBuilder.cs b/src/NMS.AMQP/Message/DefaultMessageIdBuilder.cs
new file mode 100644
index 0000000..21d4289
--- /dev/null
+++ b/src/NMS.AMQP/Message/DefaultMessageIdBuilder.cs
@@ -0,0 +1,34 @@
+using System.Text;
+using Apache.NMS.AMQP.Provider.Amqp;
+
+namespace Apache.NMS.AMQP.Message
+{
+ public class DefaultMessageIdBuilder : INmsMessageIdBuilder
+ {
+ private readonly StringBuilder builder = new StringBuilder();
+ private int idPrefixLength = -1;
+
+ public object CreateMessageId(string producerId, long messageSequence)
+ {
+ if (idPrefixLength < 0)
+ {
+ Initialize(producerId);
+ }
+
+ builder.Length = idPrefixLength;
+ builder.Append(messageSequence);
+
+ return builder.ToString();
+ }
+
+ private void Initialize(string producerId)
+ {
+ if (!AmqpMessageIdHelper.HasMessageIdPrefix(producerId))
+ {
+ builder.Append(AmqpMessageIdHelper.NMS_ID_PREFIX);
+ }
+ builder.Append(producerId).Append("-");
+ idPrefixLength = builder.Length;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
index f6e7a94..bfb90ca 100644
--- a/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
+++ b/src/NMS.AMQP/Message/Facade/INmsMessageFacade.cs
@@ -44,7 +44,13 @@
/// True if this message is tagged as being persistent
/// </summary>
bool IsPersistent { get; set; }
-
+
+ /// <summary>
+ /// Gets and sets the underlying providers message ID object for this message if one exists, null otherwise.
+ /// In the case the returned value is a string, it is not defined whether the NMS mandated 'ID:' prefix will be present.
+ /// </summary>
+ object ProviderMessageIdObject { get; set; }
+
INmsMessageFacade Copy();
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/INmsMessageIdBuilder.cs b/src/NMS.AMQP/Message/INmsMessageIdBuilder.cs
new file mode 100644
index 0000000..74898a9
--- /dev/null
+++ b/src/NMS.AMQP/Message/INmsMessageIdBuilder.cs
@@ -0,0 +1,7 @@
+namespace Apache.NMS.AMQP.Message
+{
+ public interface INmsMessageIdBuilder
+ {
+ object CreateMessageId(string producerId, long messageSequence);
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Message/InboundMessageDispatch.cs b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
index a611614..502043b 100644
--- a/src/NMS.AMQP/Message/InboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/InboundMessageDispatch.cs
@@ -21,8 +21,8 @@
{
public class InboundMessageDispatch
{
- public Id ConsumerId { get; set; }
- public ConsumerInfo ConsumerInfo { get; set; }
+ public NmsConsumerId ConsumerId { get; set; }
+ public NmsConsumerInfo ConsumerInfo { get; set; }
public NmsMessage Message { get; set; }
public bool IsDelivered { get; set; }
diff --git a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
index f69b83f..a4115c8 100644
--- a/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
+++ b/src/NMS.AMQP/Message/OutboundMessageDispatch.cs
@@ -22,8 +22,8 @@
{
public class OutboundMessageDispatch
{
- public Id ProducerId { get; set; }
- public ProducerInfo ProducerInfo { get; set; }
+ public NmsProducerId ProducerId { get; set; }
+ public NmsProducerInfo ProducerInfo { get; set; }
public NmsMessage Message { get; set; }
public bool SendAsync { get; set; }
}
diff --git a/src/NMS.AMQP/Meta/ConnectionInfo.cs b/src/NMS.AMQP/Meta/ConnectionInfo.cs
deleted file mode 100644
index 4d73322..0000000
--- a/src/NMS.AMQP/Meta/ConnectionInfo.cs
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections;
-using System.Collections.Generic;
-using System.Collections.Specialized;
-using System.Reflection;
-using Amqp;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public class ConnectionInfo : ResourceInfo
- {
- static ConnectionInfo()
- {
- Amqp.ConnectionFactory defaultCF = new Amqp.ConnectionFactory();
- AmqpSettings defaultAMQPSettings = defaultCF.AMQP;
-
- DEFAULT_CHANNEL_MAX = defaultAMQPSettings.MaxSessionsPerConnection;
- DEFAULT_MAX_FRAME_SIZE = defaultAMQPSettings.MaxFrameSize;
- DEFAULT_IDLE_TIMEOUT = defaultAMQPSettings.IdleTimeout;
-
- DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
-
- }
- public const long INFINITE = -1;
- public const long DEFAULT_CONNECT_TIMEOUT = 15000;
- public const int DEFAULT_CLOSE_TIMEOUT = 15000;
- public static readonly long DEFAULT_REQUEST_TIMEOUT;
- public static readonly long DEFAULT_IDLE_TIMEOUT;
- public static readonly long DEFAULT_SEND_TIMEOUT = INFINITE;
-
- public static readonly ushort DEFAULT_CHANNEL_MAX;
- public static readonly int DEFAULT_MAX_FRAME_SIZE;
-
-
- public ConnectionInfo() : this(null) { }
- public ConnectionInfo(Id connectionId) : base(connectionId)
- {
- }
-
- internal Uri remoteHost { get; set; }
- public string ClientId { get; private set; }
- public string username { get; set; } = null;
- public string password { get; set; } = null;
-
- public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long connectTimeout { get; set; } = DEFAULT_CONNECT_TIMEOUT;
- public int closeTimeout { get; set; } = DEFAULT_CLOSE_TIMEOUT;
- public long idleTimout { get; set; } = DEFAULT_IDLE_TIMEOUT;
-
- public long SendTimeout { get; set; } = DEFAULT_SEND_TIMEOUT;
-
- public ushort channelMax { get; set; } = DEFAULT_CHANNEL_MAX;
- public int maxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
-
- public bool LocalMessageExpiry { get; set; }
-
- public string TopicPrefix { get; internal set; } = null;
-
- public string QueuePrefix { get; internal set; } = null;
-
- public bool IsAnonymousRelay { get; internal set; } = false;
-
- public bool IsDelayedDelivery { get; internal set; } = false;
-
- public IList<string> Capabilities { get { return new List<string>(capabilities); } }
-
- public bool HasCapability(string capability)
- {
- return capabilities.Contains(capability);
- }
-
- public void AddCapability(string capability)
- {
- if (capability != null && capability.Length > 0)
- capabilities.Add(capability);
- }
-
- public StringDictionary RemotePeerProperies { get => remoteConnectionProperties; }
- public bool IsExplicitClientId { get; private set; }
-
- private StringDictionary remoteConnectionProperties = new StringDictionary();
- private List<string> capabilities = new List<string>();
-
- public override string ToString()
- {
- string result = "";
- result += "connInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
-
- if (prop.GetGetMethod(true).IsPublic)
- {
- if (prop.GetGetMethod(true).ReturnParameter.ParameterType.IsEquivalentTo(typeof(List<string>)))
- {
- result += string.Format("{0} = {1},\n", prop.Name, PropertyUtil.ToString(prop.GetValue(this,null) as IList));
- }
- else
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
-
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- public void SetClientId(string clientId, bool explicitClientId)
- {
- this.ClientId = clientId;
- this.IsExplicitClientId = explicitClientId;
- }
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/ConsumerInfo.cs b/src/NMS.AMQP/Meta/ConsumerInfo.cs
deleted file mode 100644
index c75093b..0000000
--- a/src/NMS.AMQP/Meta/ConsumerInfo.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public class ConsumerInfo : LinkInfo
- {
- public Id SessionId { get; }
-
- protected const int DEFAULT_CREDIT = 200;
-
- private int? credit = null;
-
- internal ConsumerInfo(Id id, Id sessionId) : base(id)
- {
- SessionId = sessionId;
- }
-
- public int LinkCredit
- {
- get { return credit ?? DEFAULT_CREDIT; }
- internal set { credit = value; }
- }
-
- public string Selector { get; internal set; } = null;
- public string SubscriptionName { get; internal set; } = null;
-
- public bool NoLocal { get; internal set; } = false;
- public bool HasSelector => !string.IsNullOrWhiteSpace(Selector);
- public bool IsDurable { get; set; }
- public bool IsBrowser { get; set; }
- public bool LocalMessageExpiry { get; set; }
- public IDestination Destination { get; set; }
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/ResourceInfo.cs b/src/NMS.AMQP/Meta/INmsResource.cs
similarity index 72%
rename from src/NMS.AMQP/Meta/ResourceInfo.cs
rename to src/NMS.AMQP/Meta/INmsResource.cs
index e89b9f6..0d6efbb 100644
--- a/src/NMS.AMQP/Meta/ResourceInfo.cs
+++ b/src/NMS.AMQP/Meta/INmsResource.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,20 +15,17 @@
* limitations under the License.
*/
-using Apache.NMS.AMQP.Util;
-
namespace Apache.NMS.AMQP.Meta
{
- public abstract class ResourceInfo
+ public interface INmsResource
{
+ }
- private readonly Id resourceId;
-
- protected ResourceInfo(Id resourceId)
- {
- this.resourceId = resourceId;
- }
-
- public virtual Id Id { get { return resourceId; } }
+ /// <summary>
+ /// Base class for the NMS object representing NMS resources such as Connection, Session, etc.
+ /// </summary>
+ public interface INmsResource<out TResourceId> : INmsResource where TResourceId : INmsResourceId
+ {
+ TResourceId Id { get; }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TerminusDurability.cs b/src/NMS.AMQP/Meta/INmsResourceId.cs
similarity index 80%
rename from test/Apache-NMS-AMQP-Test/TestAmqp/TerminusDurability.cs
rename to src/NMS.AMQP/Meta/INmsResourceId.cs
index b99d7c7..4100bbf 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TerminusDurability.cs
+++ b/src/NMS.AMQP/Meta/INmsResourceId.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-namespace NMS.AMQP.Test.TestAmqp
+namespace Apache.NMS.AMQP.Meta
{
- public enum TerminusDurability
+ /// <summary>
+ /// Marker interface for all Id type classes used in the NMS Framework
+ /// </summary>
+ public interface INmsResourceId
{
- NONE = 0,
- CONFIGURATION = 1,
- UNSETTLED_STATE = 2,
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/LinkInfo.cs b/src/NMS.AMQP/Meta/LinkInfo.cs
deleted file mode 100644
index 2e038ae..0000000
--- a/src/NMS.AMQP/Meta/LinkInfo.cs
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Reflection;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public abstract class LinkInfo : ResourceInfo
- {
- protected static readonly long DEFAULT_REQUEST_TIMEOUT;
- static LinkInfo()
- {
- DEFAULT_REQUEST_TIMEOUT = Convert.ToInt64(NMSConstants.defaultRequestTimeout.TotalMilliseconds);
- }
-
- protected LinkInfo(Id linkId) : base(linkId)
- {
-
- }
-
- public long requestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long closeTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
- public long sendTimeout { get; set; }
-
- public override string ToString()
- {
- string result = "";
- result += "LinkInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
- if (prop.GetGetMethod(true).IsPublic)
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConnectionId.cs b/src/NMS.AMQP/Meta/NmsConnectionId.cs
new file mode 100644
index 0000000..ed32f38
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsConnectionId.cs
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsConnectionId : INmsResourceId
+ {
+ private readonly string value;
+
+ public NmsConnectionId(string connectionId)
+ {
+ if (string.IsNullOrWhiteSpace(connectionId))
+ {
+ throw new ArgumentException("Connection ID cannot be null or empty");
+ }
+ this.value = connectionId;
+ }
+
+ protected bool Equals(NmsConnectionId other)
+ {
+ return value == other.value;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsConnectionId) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (value != null ? value.GetHashCode() : 0);
+ }
+
+ public override string ToString() => value;
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConnectionInfo.cs b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
new file mode 100644
index 0000000..24f3c13
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsConnectionInfo.cs
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Amqp;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ /// <summary>
+ /// Meta object that contains the JmsConnection identification and configuration
+ /// options. Providers can extend this to add Provider specific data as needed.
+ /// </summary>
+ public class NmsConnectionInfo : INmsResource<NmsConnectionId>
+ {
+ public static readonly long INFINITE = -1;
+ public static readonly long DEFAULT_CONNECT_TIMEOUT = 15000;
+ public static readonly long DEFAULT_CLOSE_TIMEOUT = 60000;
+ public static readonly long DEFAULT_SEND_TIMEOUT = INFINITE;
+ public static readonly long DEFAULT_REQUEST_TIMEOUT = INFINITE;
+ public static readonly int DEFAULT_IDLE_TIMEOUT;
+ public static readonly ushort DEFAULT_CHANNEL_MAX;
+ public static readonly int DEFAULT_MAX_FRAME_SIZE;
+
+ static NmsConnectionInfo()
+ {
+ AmqpSettings defaultAmqpSettings = new Amqp.ConnectionFactory().AMQP;
+ DEFAULT_CHANNEL_MAX = defaultAmqpSettings.MaxSessionsPerConnection;
+ DEFAULT_MAX_FRAME_SIZE = defaultAmqpSettings.MaxFrameSize;
+ DEFAULT_IDLE_TIMEOUT = defaultAmqpSettings.IdleTimeout;
+ }
+
+ public NmsConnectionInfo(NmsConnectionId connectionId)
+ {
+ this.Id = connectionId ?? throw new ArgumentNullException(nameof(connectionId));
+ }
+
+ public NmsConnectionId Id { get; }
+ public bool IsExplicitClientId { get; private set; }
+ public string ClientId { get; private set; }
+ public string UserName { get; set; }
+ public string Password { get; set; }
+ public Uri ConfiguredUri { get; set; }
+ public long RequestTimeout { get; set; } = DEFAULT_REQUEST_TIMEOUT;
+ public long SendTimeout { get; set; } = DEFAULT_SEND_TIMEOUT;
+ public long CloseTimeout { get; set; } = DEFAULT_CLOSE_TIMEOUT;
+ public bool LocalMessageExpiry { get; set; }
+ public string QueuePrefix { get; set; }
+ public string TopicPrefix { get; set; }
+ public ushort ChannelMax { get; set; } = DEFAULT_CHANNEL_MAX;
+ public int MaxFrameSize { get; set; } = DEFAULT_MAX_FRAME_SIZE;
+ public int IdleTimeOut { get; set; } = DEFAULT_IDLE_TIMEOUT;
+
+
+ public void SetClientId(string clientId, bool explicitClientId)
+ {
+ this.ClientId = clientId;
+ this.IsExplicitClientId = explicitClientId;
+ }
+
+ protected bool Equals(NmsConnectionInfo other)
+ {
+ return Equals(Id, other.Id);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsConnectionInfo) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return Id != null ? Id.GetHashCode() : 0;
+ }
+
+ public override string ToString()
+ {
+ return $"[{nameof(NmsConnectionInfo)}] {nameof(Id)}: {Id}, {nameof(ConfiguredUri)}: {ConfiguredUri}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerId.cs b/src/NMS.AMQP/Meta/NmsConsumerId.cs
new file mode 100644
index 0000000..f279f37
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsConsumerId.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsConsumerId : INmsResourceId
+ {
+ private string key;
+
+ public NmsConsumerId(NmsSessionId sessionId, long consumerId)
+ {
+ this.SessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId), "Session ID cannot be null");
+ this.Value = consumerId;
+ }
+
+ public long Value { get; }
+ public NmsSessionId SessionId { get; }
+ public NmsConnectionId ConnectionId => SessionId.ConnectionId;
+
+ protected bool Equals(NmsConsumerId other)
+ {
+ return Value == other.Value && Equals(SessionId, other.SessionId);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsConsumerId) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (Value.GetHashCode() * 397) ^ (SessionId != null ? SessionId.GetHashCode() : 0);
+ }
+ }
+
+ public override string ToString()
+ {
+ return key ?? (key = $"{ConnectionId}:{SessionId.Value.ToString()}:{Value.ToString()}");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsConsumerInfo.cs b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
new file mode 100644
index 0000000..0df66eb
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsConsumerInfo.cs
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsConsumerInfo : INmsResource<NmsConsumerId>
+ {
+ public static readonly int DEFAULT_CREDIT = 200;
+
+ public NmsConsumerInfo(NmsConsumerId consumerId)
+ {
+ Id = consumerId ?? throw new ArgumentNullException(nameof(consumerId), "Consumer ID cannot be null");
+ }
+
+ public NmsConsumerId Id { get; }
+ public NmsSessionId SessionId => Id.SessionId;
+ public IDestination Destination { get; set; }
+ public string Selector { get; set; }
+ public bool NoLocal { get; set; }
+ public string SubscriptionName { get; set; }
+ public bool IsDurable { get; set; }
+ public bool LocalMessageExpiry { get; set; }
+ public bool IsBrowser { get; set; }
+ public int LinkCredit { get; set; } = DEFAULT_CREDIT;
+
+ public bool HasSelector() => !string.IsNullOrWhiteSpace(Selector);
+
+ protected bool Equals(NmsConsumerInfo other)
+ {
+ return Equals(Id, other.Id);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsConsumerInfo) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (Id != null ? Id.GetHashCode() : 0);
+ }
+
+ public override string ToString()
+ {
+ return $"[{nameof(NmsConsumerInfo)}] {nameof(Id)}: {Id}, {nameof(Destination)}: {Destination}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsProducerId.cs b/src/NMS.AMQP/Meta/NmsProducerId.cs
new file mode 100644
index 0000000..1c940aa
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsProducerId.cs
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsProducerId : INmsResourceId
+ {
+ private string key;
+
+ public NmsProducerId(NmsSessionId sessionId, long producerId)
+ {
+ this.SessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId), "Session ID cannot be null");
+ this.Value = producerId;
+ }
+
+ public long Value { get; }
+ public NmsSessionId SessionId { get; }
+ public NmsConnectionId ConnectionId => SessionId.ConnectionId;
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsProducerId) obj);
+ }
+
+ protected bool Equals(NmsProducerId other)
+ {
+ return Value == other.Value && Equals(SessionId, other.SessionId);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return (Value.GetHashCode() * 397) ^ (SessionId != null ? SessionId.GetHashCode() : 0);
+ }
+ }
+
+ public override string ToString()
+ {
+ return key ?? (key = $"{ConnectionId}:{SessionId.Value.ToString()}:{Value.ToString()}");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsProducerInfo.cs b/src/NMS.AMQP/Meta/NmsProducerInfo.cs
new file mode 100644
index 0000000..170a13f
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsProducerInfo.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsProducerInfo : INmsResource<NmsProducerId>
+ {
+ public NmsProducerInfo(NmsProducerId producerId)
+ {
+ Id = producerId ?? throw new ArgumentNullException(nameof(producerId), "Producer ID cannot be null");
+ }
+
+ public NmsProducerId Id { get; }
+ public NmsSessionId SessionId => Id.SessionId;
+ public IDestination Destination { get; set; }
+
+ protected bool Equals(NmsProducerInfo other)
+ {
+ return Equals(Id, other.Id);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsProducerInfo) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (Id != null ? Id.GetHashCode() : 0);
+ }
+
+ public override string ToString()
+ {
+ return $"[{nameof(NmsProducerInfo)}] {nameof(Id)}: {Id}, {nameof(Destination)}: {Destination}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsSessionId.cs b/src/NMS.AMQP/Meta/NmsSessionId.cs
new file mode 100644
index 0000000..62c40e6
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsSessionId.cs
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsSessionId : INmsResourceId
+ {
+ private string key;
+
+ public NmsSessionId(NmsConnectionId connectionId, long value)
+ {
+ this.ConnectionId = connectionId ?? throw new ArgumentNullException(nameof(connectionId), "Connection ID cannot be null");
+ this.Value = value;
+ }
+
+ public NmsConnectionId ConnectionId { get; }
+ public long Value { get; }
+
+ protected bool Equals(NmsSessionId other)
+ {
+ return Equals(ConnectionId, other.ConnectionId) && Value == other.Value;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsSessionId) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return ((ConnectionId != null ? ConnectionId.GetHashCode() : 0) * 397) ^ Value.GetHashCode();
+ }
+ }
+
+ public override string ToString()
+ {
+ return key ?? (key = $"{ConnectionId}:{Value.ToString()}");
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsSessionInfo.cs b/src/NMS.AMQP/Meta/NmsSessionInfo.cs
new file mode 100644
index 0000000..6f8bc54
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsSessionInfo.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsSessionInfo : INmsResource<NmsSessionId>
+ {
+ public NmsSessionInfo(NmsSessionId sessionId)
+ {
+ this.Id = sessionId ?? throw new ArgumentNullException(nameof(sessionId), "Session Id object cannot be null");
+ }
+
+ public NmsSessionInfo(NmsConnectionInfo connectionInfo, long sessionId)
+ {
+ if (connectionInfo == null) throw new ArgumentNullException(nameof(connectionInfo), "Connection Info object cannot be null");
+
+ this.Id = new NmsSessionId(connectionInfo.Id, sessionId);
+ }
+
+ public AcknowledgementMode AcknowledgementMode { get; set; }
+ public NmsSessionId Id { get; }
+ public bool IsTransacted => AcknowledgementMode == AcknowledgementMode.Transactional;
+
+ protected bool Equals(NmsSessionInfo other)
+ {
+ return Equals(Id, other.Id);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsSessionInfo) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (Id != null ? Id.GetHashCode() : 0);
+ }
+
+ public override string ToString()
+ {
+ return $"[{nameof(NmsSessionInfo)}] {nameof(Id)}: {Id}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsTransactionId.cs b/src/NMS.AMQP/Meta/NmsTransactionId.cs
new file mode 100644
index 0000000..67507f1
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsTransactionId.cs
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsTransactionId : INmsResourceId
+ {
+ public NmsTransactionId(NmsConnectionId connectionId, long transactionId)
+ {
+ this.ConnectionId = connectionId ?? throw new ArgumentNullException(nameof(connectionId), "Connection ID cannot be null");
+ this.Value = transactionId;
+ }
+
+ public NmsConnectionId ConnectionId { get; }
+ public long Value { get; }
+
+ protected bool Equals(NmsTransactionId other)
+ {
+ return Equals(ConnectionId, other.ConnectionId) && Value == other.Value;
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsTransactionId) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ unchecked
+ {
+ return ((ConnectionId != null ? ConnectionId.GetHashCode() : 0) * 397) ^ Value.GetHashCode();
+ }
+ }
+
+ public override string ToString() => $"TX:{ConnectionId}:{Value.ToString()}";
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/NmsTransactionInfo.cs b/src/NMS.AMQP/Meta/NmsTransactionInfo.cs
new file mode 100644
index 0000000..b7a2d01
--- /dev/null
+++ b/src/NMS.AMQP/Meta/NmsTransactionInfo.cs
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.AMQP.Meta
+{
+ public class NmsTransactionInfo : INmsResource<NmsTransactionId>
+ {
+ public NmsTransactionInfo(NmsSessionId sessionId, NmsTransactionId transactionId)
+ {
+ this.SessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId), "Session ID cannot be null");
+ this.Id = transactionId ?? throw new ArgumentNullException(nameof(transactionId), "Transaction ID cannot be null");
+ }
+
+ public NmsTransactionId Id { get; }
+ public NmsSessionId SessionId { get; }
+ public bool IsInDoubt { get; private set; }
+ public byte[] ProviderTxId { get; set; }
+
+ public void SetInDoubt()
+ {
+ IsInDoubt = true;
+ }
+
+ protected bool Equals(NmsTransactionInfo other)
+ {
+ return Equals(Id, other.Id);
+ }
+
+ public override bool Equals(object obj)
+ {
+ if (ReferenceEquals(null, obj)) return false;
+ if (ReferenceEquals(this, obj)) return true;
+ if (obj.GetType() != this.GetType()) return false;
+ return Equals((NmsTransactionInfo) obj);
+ }
+
+ public override int GetHashCode()
+ {
+ return (Id != null ? Id.GetHashCode() : 0);
+ }
+
+ public override string ToString()
+ {
+ return $"[{nameof(NmsTransactionInfo)}] {nameof(Id)}: {Id}";
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/ProducerInfo.cs b/src/NMS.AMQP/Meta/ProducerInfo.cs
deleted file mode 100644
index 7437197..0000000
--- a/src/NMS.AMQP/Meta/ProducerInfo.cs
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Reflection;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public class ProducerInfo : LinkInfo
- {
- public Id SessionId { get; }
- protected const bool DEFAULT_DISABLE_MESSAGE_ID = false;
- protected const bool DEFAULT_DISABLE_TIMESTAMP = false;
- protected const MsgDeliveryMode DEFAULT_MSG_DELIVERY_MODE = NMSConstants.defaultDeliveryMode;
- protected const MsgPriority DEFAULT_MSG_PRIORITY = NMSConstants.defaultPriority;
- protected static readonly long DEFAULT_TTL;
-
- static ProducerInfo()
- {
- DEFAULT_TTL = Convert.ToInt64(NMSConstants.defaultTimeToLive.TotalMilliseconds);
- }
-
- internal ProducerInfo(Id id) : base(id)
- {
- }
-
- public ProducerInfo(Id id, Id sessionId) : this(id)
- {
- SessionId = sessionId;
- }
-
- public MsgDeliveryMode msgDelMode { get; set; } = DEFAULT_MSG_DELIVERY_MODE;
- public bool disableMsgId { get; set; } = DEFAULT_DISABLE_MESSAGE_ID;
- public bool disableTimeStamp { get; set; } = DEFAULT_DISABLE_TIMESTAMP;
- public MsgPriority priority { get; set; } = DEFAULT_MSG_PRIORITY;
- public long ttl { get; set; } = DEFAULT_TTL;
- public IDestination Destination { get; set; }
-
- public override string ToString()
- {
- string result = "";
- result += "producerInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
- if (prop.GetGetMethod(true).IsPublic)
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
-
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/SessionInfo.cs b/src/NMS.AMQP/Meta/SessionInfo.cs
deleted file mode 100644
index 9507fce..0000000
--- a/src/NMS.AMQP/Meta/SessionInfo.cs
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Reflection;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public class SessionInfo : ResourceInfo
- {
- public static readonly uint DEFAULT_INCOMING_WINDOW = 1024 * 10 - 1;
- public static readonly uint DEFAULT_OUTGOING_WINDOW = uint.MaxValue - 2u;
-
- internal SessionInfo(Id sessionId) : base(sessionId)
- {
- ulong endId = (ulong)sessionId.GetLastComponent(typeof(ulong));
- nextOutgoingId = Convert.ToUInt16(endId);
- }
-
- public string sessionId { get { return Id.ToString(); } }
-
- public AcknowledgementMode AcknowledgementMode { get; set; }
- public ushort remoteChannel { get; internal set; }
- public uint nextOutgoingId { get; internal set; }
- public uint incomingWindow { get; set; } = DEFAULT_INCOMING_WINDOW;
- public uint outgoingWindow { get; set; } = DEFAULT_OUTGOING_WINDOW;
- public bool IsTransacted => AcknowledgementMode == AcknowledgementMode.Transactional;
- public long requestTimeout { get; set; }
- public int closeTimeout { get; set; }
- public long sendTimeout { get; set; }
-
- public override string ToString()
- {
- string result = "";
- result += "sessInfo = [\n";
- foreach (MemberInfo info in this.GetType().GetMembers())
- {
- if (info is PropertyInfo)
- {
- PropertyInfo prop = info as PropertyInfo;
- if (prop.GetGetMethod(true).IsPublic)
- {
- result += string.Format("{0} = {1},\n", prop.Name, prop.GetValue(this, null));
- }
- }
- }
- result = result.Substring(0, result.Length - 2) + "\n]";
- return result;
- }
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/TransactionInfo.cs b/src/NMS.AMQP/Meta/TransactionInfo.cs
deleted file mode 100644
index f54187b..0000000
--- a/src/NMS.AMQP/Meta/TransactionInfo.cs
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using Apache.NMS.AMQP.Util;
-
-namespace Apache.NMS.AMQP.Meta
-{
- public sealed class TransactionInfo : ResourceInfo
- {
- public TransactionInfo(Id transactionId, Id sessionId) : base(transactionId)
- {
- if (transactionId == null)
- throw new ArgumentNullException(nameof(transactionId), "Transaction Id cannot be null");
-
- if (sessionId == null)
- throw new ArgumentNullException(nameof(sessionId), "Session Id cannot be null");
-
- SessionId = sessionId;
- }
-
- public Id SessionId { get; }
- public bool IsInDoubt { get; private set; }
-
- public void SetInDoubt()
- {
- IsInDoubt = true;
- }
-
- public byte[] ProviderTxId { get; set; }
- }
-}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsConnection.cs b/src/NMS.AMQP/NmsConnection.cs
index a34d343..6ce0903 100644
--- a/src/NMS.AMQP/NmsConnection.cs
+++ b/src/NMS.AMQP/NmsConnection.cs
@@ -35,17 +35,17 @@
private readonly AtomicBool connected = new AtomicBool();
private readonly HashSet<INmsConnectionListener> connectionListeners = new HashSet<INmsConnectionListener>();
private readonly IProvider provider;
- private readonly ConcurrentDictionary<Id, NmsSession> sessions = new ConcurrentDictionary<Id, NmsSession>();
- private readonly ConcurrentDictionary<Id, NmsTemporaryDestination> tempDestinations = new ConcurrentDictionary<Id, NmsTemporaryDestination>();
+ private readonly ConcurrentDictionary<NmsSessionId, NmsSession> sessions = new ConcurrentDictionary<NmsSessionId, NmsSession>();
+ private readonly ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination> tempDestinations = new ConcurrentDictionary<NmsTemporaryDestination, NmsTemporaryDestination>();
private readonly AtomicBool started = new AtomicBool();
- private IdGenerator sessionIdGenerator;
- private IdGenerator temporaryTopicIdGenerator;
- private IdGenerator temporaryQueueIdGenerator;
- private NestedIdGenerator transactionIdGenerator;
+ private readonly AtomicLong sessionIdGenerator = new AtomicLong();
+ private readonly AtomicLong temporaryTopicIdGenerator = new AtomicLong();
+ private readonly AtomicLong temporaryQueueIdGenerator = new AtomicLong();
+ private readonly AtomicLong transactionIdGenerator = new AtomicLong();
private Exception failureCause;
private readonly object syncRoot = new object();
- public NmsConnection(ConnectionInfo connectionInfo, IProvider provider)
+ public NmsConnection(NmsConnectionInfo connectionInfo, IProvider provider)
{
if (provider == null)
{
@@ -66,87 +66,10 @@
}
}
- public ConnectionInfo ConnectionInfo { get; }
-
- private IdGenerator SessionIdGenerator
- {
- get
- {
- if (sessionIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (sessionIdGenerator == null)
- {
- sessionIdGenerator = new NestedIdGenerator("ID:ses", ConnectionInfo.Id, true);
- }
- }
- }
-
- return sessionIdGenerator;
- }
- }
-
- private IdGenerator TemporaryTopicIdGenerator
- {
- get
- {
- if (temporaryTopicIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (temporaryTopicIdGenerator == null)
- {
- temporaryTopicIdGenerator = new NestedIdGenerator("ID:nms-temp-topic", ConnectionInfo.Id, true);
- }
- }
- }
-
- return temporaryTopicIdGenerator;
- }
- }
-
- private IdGenerator TemporaryQueueIdGenerator
- {
- get
- {
- if (temporaryQueueIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (temporaryQueueIdGenerator == null)
- {
- temporaryQueueIdGenerator = new NestedIdGenerator("ID:nms-temp-queue", ConnectionInfo.Id, true);
- }
- }
- }
-
- return temporaryQueueIdGenerator;
- }
- }
-
- internal IdGenerator TransactionIdGenerator
- {
- get
- {
- if (transactionIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (transactionIdGenerator == null)
- {
- transactionIdGenerator = new NestedIdGenerator("ID:nms-transaction", ConnectionInfo.Id, true);
- }
- }
- }
-
- return transactionIdGenerator;
- }
- }
-
+ public NmsConnectionInfo ConnectionInfo { get; }
public bool IsClosed => closed.Value;
public bool IsConnected => connected.Value;
- public Id Id => ConnectionInfo.Id;
+ public NmsConnectionId Id => ConnectionInfo.Id;
public INmsMessageFactory MessageFactory { get; private set; }
public void Dispose()
@@ -192,10 +115,7 @@
CheckClosedOrFailed();
CreateNmsConnection();
- NmsSession session = new NmsSession(this, SessionIdGenerator.GenerateId(), acknowledgementMode)
- {
- SessionInfo = { requestTimeout = ConnectionInfo.requestTimeout }
- };
+ NmsSession session = new NmsSession(this, GetNextSessionId(), acknowledgementMode);
try
{
session.Begin().ConfigureAwait(false).GetAwaiter().GetResult();
@@ -268,8 +188,8 @@
public TimeSpan RequestTimeout
{
- get => TimeSpan.FromMilliseconds(ConnectionInfo.requestTimeout);
- set => ConnectionInfo.requestTimeout = Convert.ToInt64(value.TotalMilliseconds);
+ get => TimeSpan.FromMilliseconds(ConnectionInfo.RequestTimeout);
+ set => ConnectionInfo.RequestTimeout = Convert.ToInt64(value.TotalMilliseconds);
}
public AcknowledgementMode AcknowledgementMode { get; set; }
@@ -408,11 +328,11 @@
}
}
- public void OnResourceClosed(ResourceInfo resourceInfo, Exception error)
+ public void OnResourceClosed(INmsResource resource, Exception error)
{
- switch (resourceInfo)
+ switch (resource)
{
- case ConsumerInfo consumerInfo:
+ case NmsConsumerInfo consumerInfo:
{
if (!sessions.TryGetValue(consumerInfo.SessionId, out NmsSession session))
return;
@@ -425,7 +345,7 @@
break;
}
- case ProducerInfo producerInfo:
+ case NmsProducerInfo producerInfo:
{
if (!sessions.TryGetValue(producerInfo.SessionId, out NmsSession session))
return;
@@ -464,14 +384,14 @@
}
}
- internal Task CreateResource(ResourceInfo resourceInfo)
+ internal Task CreateResource(INmsResource resource)
{
- return provider.CreateResource(resourceInfo);
+ return provider.CreateResource(resource);
}
- internal Task DestroyResource(ResourceInfo resourceInfo)
+ internal Task DestroyResource(INmsResource resource)
{
- return provider.DestroyResource(resourceInfo);
+ return provider.DestroyResource(resource);
}
internal Task Send(OutboundMessageDispatch envelope)
@@ -523,17 +443,17 @@
ExceptionListener?.Invoke(error);
}
- internal Task Recover(Id sessionId)
+ internal Task Recover(NmsSessionId sessionId)
{
return provider.Recover(sessionId);
}
- public Task StartResource(ResourceInfo resourceInfo)
+ public Task StartResource(INmsResource resourceInfo)
{
return provider.StartResource(resourceInfo);
}
- public Task StopResource(ResourceInfo resourceInfo)
+ public Task StopResource(INmsResource resourceInfo)
{
return provider.StopResource(resourceInfo);
}
@@ -543,7 +463,7 @@
connectionListeners.Add(listener);
}
- internal Task Acknowledge(Id sessionId, AckType ackType)
+ internal Task Acknowledge(NmsSessionId sessionId, AckType ackType)
{
return provider.Acknowledge(sessionId, ackType);
}
@@ -553,21 +473,23 @@
return provider.Acknowledge(envelope, ackType);
}
- internal void RemoveSession(SessionInfo sessionInfo)
+ internal void RemoveSession(NmsSessionInfo sessionInfo)
{
sessions.TryRemove(sessionInfo.Id, out _);
}
public ITemporaryQueue CreateTemporaryQueue()
{
- NmsTemporaryQueue queue = new NmsTemporaryQueue(TemporaryQueueIdGenerator.GenerateId());
+ var destinationName = $"{Id}:{temporaryQueueIdGenerator.IncrementAndGet().ToString()}";
+ var queue = new NmsTemporaryQueue(destinationName);
InitializeTemporaryDestination(queue);
return queue;
}
public ITemporaryTopic CreateTemporaryTopic()
{
- NmsTemporaryTopic topic = new NmsTemporaryTopic(TemporaryTopicIdGenerator.GenerateId());
+ var destinationName = $"{Id}:{temporaryTopicIdGenerator.IncrementAndGet().ToString()}";
+ NmsTemporaryTopic topic = new NmsTemporaryTopic(destinationName);
InitializeTemporaryDestination(topic);
return topic;
}
@@ -575,7 +497,7 @@
private void InitializeTemporaryDestination(NmsTemporaryDestination temporaryDestination)
{
CreateResource(temporaryDestination).ConfigureAwait(false).GetAwaiter().GetResult();
- tempDestinations.TryAdd(temporaryDestination.Id, temporaryDestination);
+ tempDestinations.TryAdd(temporaryDestination, temporaryDestination);
temporaryDestination.Connection = this;
}
@@ -599,7 +521,7 @@
}
}
- tempDestinations.TryRemove(destination.Id, out _);
+ tempDestinations.TryRemove(destination, out _);
DestroyResource(destination).ConfigureAwait(false).GetAwaiter().GetResult();
}
@@ -616,14 +538,24 @@
provider.Unsubscribe(subscriptionName).ConfigureAwait(false).GetAwaiter().GetResult();
}
- public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
return provider.Rollback(transactionInfo, nextTransactionInfo);
}
- public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
return provider.Commit(transactionInfo, nextTransactionInfo);
}
+
+ private NmsSessionId GetNextSessionId()
+ {
+ return new NmsSessionId(ConnectionInfo.Id, sessionIdGenerator.IncrementAndGet());
+ }
+
+ public NmsTransactionId GetNextTransactionId()
+ {
+ return new NmsTransactionId(Id, transactionIdGenerator);
+ }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsConnectionFactory.cs b/src/NMS.AMQP/NmsConnectionFactory.cs
index eff1e4f..7f3f1c5 100644
--- a/src/NMS.AMQP/NmsConnectionFactory.cs
+++ b/src/NMS.AMQP/NmsConnectionFactory.cs
@@ -1,255 +1,268 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-using System;
-using System.Collections.Specialized;
-using Apache.NMS.AMQP.Meta;
-using Apache.NMS.AMQP.Provider;
-using Apache.NMS.AMQP.Util;
-using Apache.NMS.Util;
-using URISupport = Apache.NMS.AMQP.Util.URISupport;
-
-namespace Apache.NMS.AMQP
-{
- public class NmsConnectionFactory : IConnectionFactory
- {
- private const string DEFAULT_REMOTE_HOST = "localhost";
- private const string DEFAULT_REMOTE_PORT = "5672";
- private Uri brokerUri;
-
- private IdGenerator clientIdGenerator;
- private IdGenerator connectionIdGenerator;
- private object syncRoot = new object();
-
- public NmsConnectionFactory(string userName, string password)
- {
- UserName = userName;
- Password = password;
- }
-
- public NmsConnectionFactory()
- {
- BrokerUri = GetDefaultRemoteAddress();
- }
-
- public NmsConnectionFactory(string userName, string password, string brokerUri) : this(userName, password)
- {
- BrokerUri = CreateUri(brokerUri);
- }
-
- public NmsConnectionFactory(string brokerUri)
- {
- BrokerUri = CreateUri(brokerUri);
- }
-
- public NmsConnectionFactory(Uri brokerUri)
- {
- BrokerUri = brokerUri;
- }
-
- private IdGenerator ClientIdGenerator
- {
- get
- {
- if (clientIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (clientIdGenerator == null)
- {
- clientIdGenerator = ClientIdPrefix != null ? new IdGenerator(ClientIdPrefix) : new IdGenerator();
- }
- }
- }
-
- return connectionIdGenerator;
- }
- }
-
- private IdGenerator ConnectionIdGenerator
- {
- get
- {
- if (connectionIdGenerator == null)
- {
- lock (syncRoot)
- {
- if (connectionIdGenerator == null)
- {
- connectionIdGenerator = ConnectionIdPrefix != null ? new IdGenerator(ConnectionIdPrefix) : new IdGenerator();
- }
- }
- }
-
- return connectionIdGenerator;
- }
- }
-
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using System.Collections.Specialized;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Provider;
+using Apache.NMS.AMQP.Util;
+using Apache.NMS.Util;
+using URISupport = Apache.NMS.AMQP.Util.URISupport;
+
+namespace Apache.NMS.AMQP
+{
+ public class NmsConnectionFactory : IConnectionFactory
+ {
+ private const string DEFAULT_REMOTE_HOST = "localhost";
+ private const string DEFAULT_REMOTE_PORT = "5672";
+ private Uri brokerUri;
+
+ private IdGenerator clientIdGenerator;
+ private IdGenerator connectionIdGenerator;
+ private readonly object syncRoot = new object();
+
+ public NmsConnectionFactory(string userName, string password)
+ {
+ UserName = userName;
+ Password = password;
+ }
+
+ public NmsConnectionFactory()
+ {
+ BrokerUri = GetDefaultRemoteAddress();
+ }
+
+ public NmsConnectionFactory(string userName, string password, string brokerUri) : this(userName, password)
+ {
+ BrokerUri = CreateUri(brokerUri);
+ }
+
+ public NmsConnectionFactory(string brokerUri)
+ {
+ BrokerUri = CreateUri(brokerUri);
+ }
+
+ public NmsConnectionFactory(Uri brokerUri)
+ {
+ BrokerUri = brokerUri;
+ }
+
+ private IdGenerator ClientIdGenerator
+ {
+ get
+ {
+ if (clientIdGenerator == null)
+ {
+ lock (syncRoot)
+ {
+ if (clientIdGenerator == null)
+ {
+ clientIdGenerator = ClientIdPrefix != null ? new IdGenerator(ClientIdPrefix) : new IdGenerator();
+ }
+ }
+ }
+
+ return clientIdGenerator;
+ }
+ }
+
+ private IdGenerator ConnectionIdGenerator
+ {
+ get
+ {
+ if (connectionIdGenerator == null)
+ {
+ lock (syncRoot)
+ {
+ if (connectionIdGenerator == null)
+ {
+ connectionIdGenerator = ConnectionIdPrefix != null ? new IdGenerator(ConnectionIdPrefix) : new IdGenerator();
+ }
+ }
+ }
+
+ return connectionIdGenerator;
+ }
+ }
+
/// <summary>
/// Enables local message expiry for all MessageConsumers under the connection. Default is true.
- /// </summary>
- public bool LocalMessageExpiry { get; set; } = true;
-
- /// <summary>
- /// User name value used to authenticate the connection
- /// </summary>
- public string UserName { get; set; }
-
- /// <summary>
- /// The password value used to authenticate the connection
- /// </summary>
- public string Password { get; set; }
-
- /// <summary>
- /// Timeout value that controls how long the client waits on completion of various synchronous interactions, such as opening a producer or consumer,
- /// before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
- /// </summary>
- public long RequestTimeout { get; set; } = ConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
-
- /// <summary>
- /// Timeout value that controls how long the client waits on completion of a synchronous message send before returning an error.
- /// By default the client will wait indefinitely for a send to complete.
- /// </summary>
- public long SendTimeout { get; set; } = ConnectionInfo.DEFAULT_SEND_TIMEOUT;
-
- /// <summary>
- /// Optional prefix value that is used for generated Connection ID values when a new Connection is created for the NMS ConnectionFactory.
- /// This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing
- /// the logs easier. The default prefix is 'ID:'.
- /// </summary>
- public string ConnectionIdPrefix { get; set; }
-
- /// <summary>
- /// Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory.
- /// The default prefix is 'ID:'.
- /// </summary>
- public string ClientIdPrefix { get; set; }
-
- /// <summary>
- /// Sets and gets the NMS clientId to use for connections created by this factory.
- ///
- /// NOTE: A clientID can only be used by one Connection at a time, so setting it here
- /// will restrict the ConnectionFactory to creating a single open Connection at a time.
- /// It is possible to set the clientID on the Connection itself immediately after
- /// creation if no value has been set via the factory that created it, which will
- /// allow the factory to create multiple open connections at a time.
- /// </summary>
- public string ClientId { get; set; }
-
- public IConnection CreateConnection()
- {
- return CreateConnection(UserName, Password);
- }
-
- public IConnection CreateConnection(string userName, string password)
- {
- try
- {
- ConnectionInfo connectionInfo = ConfigureConnectionInfo(userName, password);
- IProvider provider = ProviderFactory.Create(BrokerUri);
- return new NmsConnection(connectionInfo, provider);
- }
- catch (Exception e)
- {
- throw NMSExceptionSupport.Create(e);
- }
- }
-
- public Uri BrokerUri
- {
- get => brokerUri;
- set
- {
- if (value == null)
- throw new ArgumentNullException(nameof(value), "Invalid URI: cannot be null or empty");
-
- brokerUri = value;
-
- if (URISupport.IsCompositeUri(value))
- {
- var compositeData = NMS.Util.URISupport.ParseComposite(value);
- SetUriOptions(compositeData.Parameters);
- brokerUri = compositeData.toUri();
- }
- else
- {
- StringDictionary options = NMS.Util.URISupport.ParseQuery(brokerUri.Query);
- SetUriOptions(options);
- }
- }
- }
-
- public IRedeliveryPolicy RedeliveryPolicy { get; set; }
- public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
- public ProducerTransformerDelegate ProducerTransformer { get; set; }
-
- private Uri CreateUri(string uri)
- {
- if (string.IsNullOrEmpty(uri))
- {
- throw new ArgumentException("Invalid Uri: cannot be null or empty");
- }
-
- try
- {
- return NMS.Util.URISupport.CreateCompatibleUri(uri);
- }
- catch (UriFormatException e)
- {
- throw new ArgumentException("Invalid Uri: " + uri, e);
- }
- }
-
- private ConnectionInfo ConfigureConnectionInfo(string userName, string password)
- {
- ConnectionInfo connectionInfo = new ConnectionInfo(ConnectionIdGenerator.GenerateId())
- {
- username = userName,
- password = password,
- remoteHost = BrokerUri,
- requestTimeout = RequestTimeout,
- SendTimeout = SendTimeout,
- LocalMessageExpiry = LocalMessageExpiry
- };
-
- bool userSpecifiedClientId = ClientId != null;
-
- if (userSpecifiedClientId)
- {
- connectionInfo.SetClientId(ClientId, true);
- }
- else
- {
- connectionInfo.SetClientId(ClientIdGenerator.GenerateId().ToString(), false);
- }
-
- return connectionInfo;
- }
-
- private void SetUriOptions(StringDictionary options)
- {
- StringDictionary nmsOptions = PropertyUtil.FilterProperties(options, "nms.");
- PropertyUtil.SetProperties(this, nmsOptions);
- // TODO: Check if there are any unused options, if so throw argument exception
- }
-
- private Uri GetDefaultRemoteAddress()
- {
- return new Uri("amqp://" + DEFAULT_REMOTE_HOST + ":" + DEFAULT_REMOTE_PORT);
- }
- }
+ /// </summary>
+ public bool LocalMessageExpiry { get; set; } = true;
+
+ /// <summary>
+ /// User name value used to authenticate the connection
+ /// </summary>
+ public string UserName { get; set; }
+
+ /// <summary>
+ /// The password value used to authenticate the connection
+ /// </summary>
+ public string Password { get; set; }
+
+ /// <summary>
+ /// Timeout value that controls how long the client waits on completion of various synchronous interactions, such as opening a producer or consumer,
+ /// before returning an error. Does not affect synchronous message sends. By default the client will wait indefinitely for a request to complete.
+ /// </summary>
+ public long RequestTimeout { get; set; } = NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+
+ /// <summary>
+ /// Timeout value that controls how long the client waits on completion of a synchronous message send before returning an error.
+ /// By default the client will wait indefinitely for a send to complete.
+ /// </summary>
+ public long SendTimeout { get; set; } = NmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+
+ /// <summary>
+ /// Gets and sets the close timeout used to control how long a Connection close will wait for
+ /// clean shutdown of the connection before giving up. A negative value means wait
+ /// forever.
+ ///
+ /// Care should be taken in that a very short close timeout can cause the client to
+ /// not cleanly shutdown the connection and it's resources.
+ ///
+ /// Time in milliseconds to wait for a clean connection close.
+ /// </summary>
+ public long CloseTimeout { get; set; } = NmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+
+ /// <summary>
+ /// Optional prefix value that is used for generated Connection ID values when a new Connection is created for the NMS ConnectionFactory.
+ /// This connection ID is used when logging some information from the JMS Connection object so a configurable prefix can make breadcrumbing
+ /// the logs easier. The default prefix is 'ID:'.
+ /// </summary>
+ public string ConnectionIdPrefix { get; set; }
+
+ /// <summary>
+ /// Optional prefix value that is used for generated Client ID values when a new Connection is created for the JMS ConnectionFactory.
+ /// The default prefix is 'ID:'.
+ /// </summary>
+ public string ClientIdPrefix { get; set; }
+
+ /// <summary>
+ /// Sets and gets the NMS clientId to use for connections created by this factory.
+ ///
+ /// NOTE: A clientID can only be used by one Connection at a time, so setting it here
+ /// will restrict the ConnectionFactory to creating a single open Connection at a time.
+ /// It is possible to set the clientID on the Connection itself immediately after
+ /// creation if no value has been set via the factory that created it, which will
+ /// allow the factory to create multiple open connections at a time.
+ /// </summary>
+ public string ClientId { get; set; }
+
+ public IConnection CreateConnection()
+ {
+ return CreateConnection(UserName, Password);
+ }
+
+ public IConnection CreateConnection(string userName, string password)
+ {
+ try
+ {
+ NmsConnectionInfo connectionInfo = ConfigureConnectionInfo(userName, password);
+ IProvider provider = ProviderFactory.Create(BrokerUri);
+ return new NmsConnection(connectionInfo, provider);
+ }
+ catch (Exception e)
+ {
+ throw NMSExceptionSupport.Create(e);
+ }
+ }
+
+ public Uri BrokerUri
+ {
+ get => brokerUri;
+ set
+ {
+ if (value == null)
+ throw new ArgumentNullException(nameof(value), "Invalid URI: cannot be null or empty");
+
+ brokerUri = value;
+
+ if (URISupport.IsCompositeUri(value))
+ {
+ var compositeData = NMS.Util.URISupport.ParseComposite(value);
+ SetUriOptions(compositeData.Parameters);
+ brokerUri = compositeData.toUri();
+ }
+ else
+ {
+ StringDictionary options = NMS.Util.URISupport.ParseQuery(brokerUri.Query);
+ SetUriOptions(options);
+ }
+ }
+ }
+
+ public IRedeliveryPolicy RedeliveryPolicy { get; set; }
+ public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
+ public ProducerTransformerDelegate ProducerTransformer { get; set; }
+
+ private Uri CreateUri(string uri)
+ {
+ if (string.IsNullOrEmpty(uri))
+ {
+ throw new ArgumentException("Invalid Uri: cannot be null or empty");
+ }
+
+ try
+ {
+ return NMS.Util.URISupport.CreateCompatibleUri(uri);
+ }
+ catch (UriFormatException e)
+ {
+ throw new ArgumentException("Invalid Uri: " + uri, e);
+ }
+ }
+
+ private NmsConnectionInfo ConfigureConnectionInfo(string userName, string password)
+ {
+ var connectionInfo = new NmsConnectionInfo(new NmsConnectionId(ConnectionIdGenerator.GenerateId()))
+ {
+ UserName = userName,
+ Password = password,
+ ConfiguredUri = BrokerUri,
+ RequestTimeout = RequestTimeout,
+ SendTimeout = SendTimeout,
+ CloseTimeout = CloseTimeout,
+ LocalMessageExpiry = LocalMessageExpiry
+ };
+
+ bool userSpecifiedClientId = ClientId != null;
+
+ if (userSpecifiedClientId)
+ {
+ connectionInfo.SetClientId(ClientId, true);
+ }
+ else
+ {
+ connectionInfo.SetClientId(ClientIdGenerator.GenerateId().ToString(), false);
+ }
+
+ return connectionInfo;
+ }
+
+ private void SetUriOptions(StringDictionary options)
+ {
+ StringDictionary nmsOptions = PropertyUtil.FilterProperties(options, "nms.");
+ PropertyUtil.SetProperties(this, nmsOptions);
+ // TODO: Check if there are any unused options, if so throw argument exception
+ }
+
+ private Uri GetDefaultRemoteAddress()
+ {
+ return new Uri("amqp://" + DEFAULT_REMOTE_HOST + ":" + DEFAULT_REMOTE_PORT);
+ }
+ }
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
index a3cd119..bc446bb 100644
--- a/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
+++ b/src/NMS.AMQP/NmsDurableTopicSubscriber.cs
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-using Apache.NMS.AMQP.Util;
+using Apache.NMS.AMQP.Meta;
namespace Apache.NMS.AMQP
{
public class NmsDurableTopicSubscriber : NmsMessageConsumer
{
- public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
+ public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : base(consumerId, session, destination, selector, noLocal)
{
}
- public NmsDurableTopicSubscriber(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
+ public NmsDurableTopicSubscriber(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal) : base(consumerId, session, destination, name, selector, noLocal)
{
}
diff --git a/src/NMS.AMQP/NmsLocalTransactionContext.cs b/src/NMS.AMQP/NmsLocalTransactionContext.cs
index 76c2be4..caae980 100644
--- a/src/NMS.AMQP/NmsLocalTransactionContext.cs
+++ b/src/NMS.AMQP/NmsLocalTransactionContext.cs
@@ -33,9 +33,9 @@
internal sealed class NmsLocalTransactionContext : INmsTransactionContext
{
private readonly NmsConnection connection;
- private readonly HashSet<Id> participants = new HashSet<Id>();
+ private readonly HashSet<INmsResourceId> participants = new HashSet<INmsResourceId>();
private readonly NmsSession session;
- private TransactionInfo transactionInfo;
+ private NmsTransactionInfo transactionInfo;
public NmsLocalTransactionContext(NmsSession session)
{
@@ -124,7 +124,7 @@
}
}
- public bool IsActiveInThisContext(Id infoId)
+ public bool IsActiveInThisContext(INmsResourceId infoId)
{
return this.participants.Contains(infoId);
}
@@ -236,10 +236,10 @@
}
}
- private TransactionInfo GetNextTransactionInfo()
+ private NmsTransactionInfo GetNextTransactionInfo()
{
- var transactionId = this.connection.TransactionIdGenerator.GenerateId();
- return new TransactionInfo(transactionId, this.session.SessionInfo.Id);
+ NmsTransactionId transactionId = this.connection.GetNextTransactionId();
+ return new NmsTransactionInfo(this.session.SessionInfo.Id, transactionId);
}
private bool IsInDoubt()
diff --git a/src/NMS.AMQP/NmsMessageConsumer.cs b/src/NMS.AMQP/NmsMessageConsumer.cs
index b4cf301..121a93c 100644
--- a/src/NMS.AMQP/NmsMessageConsumer.cs
+++ b/src/NMS.AMQP/NmsMessageConsumer.cs
@@ -36,11 +36,11 @@
private Exception failureCause;
- public NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : this(consumerId, session, destination, null, selector, noLocal)
+ public NmsMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : this(consumerId, session, destination, null, selector, noLocal)
{
}
- protected NmsMessageConsumer(Id consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
+ protected NmsMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
{
Session = session;
acknowledgementMode = session.AcknowledgementMode;
@@ -50,7 +50,7 @@
session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination) destination);
}
- Info = new ConsumerInfo(consumerId, Session.SessionInfo.Id)
+ Info = new NmsConsumerInfo(consumerId)
{
Destination = destination,
Selector = selector,
@@ -70,7 +70,7 @@
}
public NmsSession Session { get; }
- public ConsumerInfo Info { get; }
+ public NmsConsumerInfo Info { get; }
public IDestination Destination => Info.Destination;
protected virtual bool IsDurableSubscription => false;
diff --git a/src/NMS.AMQP/NmsMessageProducer.cs b/src/NMS.AMQP/NmsMessageProducer.cs
index c5da8e2..3e29c78 100644
--- a/src/NMS.AMQP/NmsMessageProducer.cs
+++ b/src/NMS.AMQP/NmsMessageProducer.cs
@@ -17,6 +17,7 @@
using System;
using System.Threading.Tasks;
+using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
@@ -27,6 +28,7 @@
{
private readonly NmsSession session;
private readonly AtomicBool closed = new AtomicBool();
+ private readonly AtomicLong messageSequence = new AtomicLong();
private Exception failureCause;
private MsgDeliveryMode deliveryMode = MsgDeliveryMode.Persistent;
@@ -36,21 +38,22 @@
private bool disableMessageId;
private bool disableMessageTimestamp;
- public NmsMessageProducer(Id producerId, NmsSession session, IDestination destination)
+ public NmsMessageProducer(NmsProducerId producerId, NmsSession session, IDestination destination)
{
this.session = session;
- Info = new ProducerInfo(producerId, session.SessionInfo.Id)
+ Info = new NmsProducerInfo(producerId)
{
Destination = destination
};
-
+
session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult();
-
+
session.Add(this);
}
- public ProducerInfo Info { get; }
- public IdGenerator MessageIdGenerator { get; } = new CustomIdGenerator(true, "ID", new AtomicSequence());
+ public NmsProducerId ProducerId => Info.Id;
+ public NmsProducerInfo Info { get; }
+ public INmsMessageIdBuilder MessageIdBuilder { get; } = new DefaultMessageIdBuilder();
public void Dispose()
{
@@ -228,11 +231,6 @@
}
}
- public Task Init()
- {
- return session.Connection.CreateResource(Info);
- }
-
public Task OnConnectionRecovery(IProvider provider)
{
return provider.CreateResource(Info);
@@ -256,5 +254,13 @@
session.Remove(this);
}
}
+
+ /// <summary>
+ /// Returns the next logical sequence for a Message sent from this Producer.
+ /// </summary>
+ public long GetNextMessageSequence()
+ {
+ return messageSequence.IncrementAndGet();
+ }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/NmsNoTxTransactionContext.cs b/src/NMS.AMQP/NmsNoTxTransactionContext.cs
index 3aaafd4..a8e1607 100644
--- a/src/NMS.AMQP/NmsNoTxTransactionContext.cs
+++ b/src/NMS.AMQP/NmsNoTxTransactionContext.cs
@@ -17,6 +17,7 @@
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
+using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
@@ -74,7 +75,7 @@
return Task.CompletedTask;
}
- public bool IsActiveInThisContext(Id infoId)
+ public bool IsActiveInThisContext(INmsResourceId infoId)
{
return false;
}
diff --git a/src/NMS.AMQP/NmsSession.cs b/src/NMS.AMQP/NmsSession.cs
index f8930f1..425cf51 100644
--- a/src/NMS.AMQP/NmsSession.cs
+++ b/src/NMS.AMQP/NmsSession.cs
@@ -30,31 +30,29 @@
{
public class NmsSession : ISession
{
- private readonly ConcurrentDictionary<Id, NmsMessageConsumer> consumers = new ConcurrentDictionary<Id, NmsMessageConsumer>();
- private readonly ConcurrentDictionary<Id, NmsMessageProducer> producers = new ConcurrentDictionary<Id, NmsMessageProducer>();
+ private readonly ConcurrentDictionary<NmsConsumerId, NmsMessageConsumer> consumers = new ConcurrentDictionary<NmsConsumerId, NmsMessageConsumer>();
+ private readonly ConcurrentDictionary<NmsProducerId, NmsMessageProducer> producers = new ConcurrentDictionary<NmsProducerId, NmsMessageProducer>();
- private readonly NestedIdGenerator consumerIdGenerator;
- private readonly NestedIdGenerator producerIdGenerator;
+ private readonly AtomicLong consumerIdGenerator = new AtomicLong();
+ private readonly AtomicLong producerIdGenerator = new AtomicLong();
private readonly AtomicBool closed = new AtomicBool();
private readonly AtomicBool started = new AtomicBool();
- public SessionInfo SessionInfo { get; }
+ public NmsSessionInfo SessionInfo { get; }
public NmsConnection Connection { get; }
private SessionDispatcher dispatcher;
private Exception failureCause;
private readonly AcknowledgementMode acknowledgementMode;
- public NmsSession(NmsConnection connection, Id sessionId, AcknowledgementMode acknowledgementMode)
+ public NmsSession(NmsConnection connection, NmsSessionId sessionId, AcknowledgementMode acknowledgementMode)
{
Connection = connection;
this.acknowledgementMode = acknowledgementMode;
- SessionInfo = new SessionInfo(sessionId)
+ SessionInfo = new NmsSessionInfo(sessionId)
{
AcknowledgementMode = acknowledgementMode
};
- consumerIdGenerator = new NestedIdGenerator("ID:consumer", SessionInfo.Id, true);
- producerIdGenerator = new NestedIdGenerator("ID:producer", SessionInfo.Id, true);
if (AcknowledgementMode == AcknowledgementMode.Transactional)
TransactionContext = new NmsLocalTransactionContext(this);
@@ -118,7 +116,12 @@
public IMessageProducer CreateProducer(IDestination destination)
{
- return new NmsMessageProducer(producerIdGenerator.GenerateId(), this, destination);
+ return new NmsMessageProducer(GetNextProducerId(), this, destination);
+ }
+
+ private NmsProducerId GetNextProducerId()
+ {
+ return new NmsProducerId(SessionInfo.Id, producerIdGenerator.IncrementAndGet());
}
public IMessageConsumer CreateConsumer(IDestination destination)
@@ -135,17 +138,22 @@
{
CheckClosed();
- NmsMessageConsumer messageConsumer = new NmsMessageConsumer(consumerIdGenerator.GenerateId(), this, destination, selector, noLocal);
+ NmsMessageConsumer messageConsumer = new NmsMessageConsumer(GetNextConsumerId(), this, destination, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
return messageConsumer;
}
+ private NmsConsumerId GetNextConsumerId()
+ {
+ return new NmsConsumerId(SessionInfo.Id, consumerIdGenerator.IncrementAndGet());
+ }
+
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
CheckClosed();
- NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(consumerIdGenerator.GenerateId(), this, destination, name, selector, noLocal);
+ NmsMessageConsumer messageConsumer = new NmsDurableTopicSubscriber(GetNextConsumerId(), this, destination, name, selector, noLocal);
messageConsumer.Init().ConfigureAwait(false).GetAwaiter().GetResult();
return messageConsumer;
@@ -410,19 +418,33 @@
bool hasTTL = timeToLive > TimeSpan.Zero;
if (!disableMessageTimestamp)
+ {
original.NMSTimestamp = timeStamp;
+ }
else
original.NMSTimestamp = DateTime.MinValue;
-
+
+ long messageSequence = producer.GetNextMessageSequence();
+ object messageId = null;
if (!disableMessageId)
- original.NMSMessageId = producer.MessageIdGenerator.GenerateId().ToString();
- else
- original.NMSMessageId = null;
+ {
+ messageId = producer.MessageIdBuilder.CreateMessageId(producer.ProducerId.ToString(), messageSequence);
+ }
if (isNmsMessage)
outbound = (NmsMessage) original;
else
outbound = NmsMessageTransformation.TransformMessage(Connection.MessageFactory, original);
+
+ // Set the message ID
+ outbound.Facade.ProviderMessageIdObject = messageId;
+ if (!isNmsMessage)
+ {
+ // If the original was a foreign message, we still need to update it
+ // with the properly encoded Message ID String, get it from the one
+ // we transformed from now that it is set.
+ original.NMSMessageId = outbound.NMSMessageId;
+ }
if (hasTTL)
outbound.Facade.Expiration = timeStamp + timeToLive;
@@ -452,7 +474,7 @@
dispatcher?.Post(task);
}
- public NmsMessageConsumer ConsumerClosed(Id consumerId, Exception cause)
+ public NmsMessageConsumer ConsumerClosed(NmsConsumerId consumerId, Exception cause)
{
Tracer.Info($"A NMS MessageConsumer has been closed: {consumerId}");
@@ -476,7 +498,7 @@
return consumer;
}
- public NmsMessageProducer ProducerClosed(Id producerId, Exception cause)
+ public NmsMessageProducer ProducerClosed(NmsProducerId producerId, Exception cause)
{
Tracer.Info($"A NmsMessageProducer has been closed: {producerId}. Cause: {cause}");
diff --git a/src/NMS.AMQP/NmsTemporaryDestination.cs b/src/NMS.AMQP/NmsTemporaryDestination.cs
index 07d2fac..a2876d4 100644
--- a/src/NMS.AMQP/NmsTemporaryDestination.cs
+++ b/src/NMS.AMQP/NmsTemporaryDestination.cs
@@ -20,11 +20,11 @@
namespace Apache.NMS.AMQP
{
- public abstract class NmsTemporaryDestination : ResourceInfo, IDestination
+ public abstract class NmsTemporaryDestination : IDestination, INmsResource
{
- protected NmsTemporaryDestination(Id resourceId) : base(resourceId)
+ protected NmsTemporaryDestination(string address)
{
- Address = resourceId.ToString();
+ this.Address = address;
}
public string Address { get; set; }
diff --git a/src/NMS.AMQP/NmsTemporaryQueue.cs b/src/NMS.AMQP/NmsTemporaryQueue.cs
index 7d40bd0..d069a6b 100644
--- a/src/NMS.AMQP/NmsTemporaryQueue.cs
+++ b/src/NMS.AMQP/NmsTemporaryQueue.cs
@@ -15,13 +15,11 @@
* limitations under the License.
*/
-using Apache.NMS.AMQP.Util;
-
namespace Apache.NMS.AMQP
{
public class NmsTemporaryQueue : NmsTemporaryDestination, ITemporaryQueue
{
- public NmsTemporaryQueue(Id resourceId) : base(resourceId)
+ public NmsTemporaryQueue(string queueName) : base(queueName)
{
}
diff --git a/src/NMS.AMQP/NmsTemporaryTopic.cs b/src/NMS.AMQP/NmsTemporaryTopic.cs
index c4c0bdb..5a64cbf 100644
--- a/src/NMS.AMQP/NmsTemporaryTopic.cs
+++ b/src/NMS.AMQP/NmsTemporaryTopic.cs
@@ -15,13 +15,11 @@
* limitations under the License.
*/
-using Apache.NMS.AMQP.Util;
-
namespace Apache.NMS.AMQP
{
public class NmsTemporaryTopic : NmsTemporaryDestination, ITemporaryTopic
{
- public NmsTemporaryTopic(Id resourceId) : base(resourceId)
+ public NmsTemporaryTopic(string topicName) : base(topicName)
{
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
index bcdd5a3..4692551 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnection.cs
@@ -38,8 +38,8 @@
public class AmqpConnection : IAmqpConnection
{
- private readonly ConcurrentDictionary<Id, AmqpSession> sessions = new ConcurrentDictionary<Id, AmqpSession>();
- private readonly ConcurrentDictionary<Id, AmqpTemporaryDestination> temporaryDestinations = new ConcurrentDictionary<Id, AmqpTemporaryDestination>();
+ private readonly ConcurrentDictionary<NmsSessionId, AmqpSession> sessions = new ConcurrentDictionary<NmsSessionId, AmqpSession>();
+ private readonly ConcurrentDictionary<NmsTemporaryDestination, AmqpTemporaryDestination> temporaryDestinations = new ConcurrentDictionary<NmsTemporaryDestination, AmqpTemporaryDestination>();
public AmqpProvider Provider { get; }
private readonly ITransportContext transport;
@@ -49,7 +49,7 @@
private AmqpConnectionSession connectionSession;
private TaskCompletionSource<bool> tsc;
- public AmqpConnection(AmqpProvider provider, ITransportContext transport, ConnectionInfo info)
+ public AmqpConnection(AmqpProvider provider, ITransportContext transport, NmsConnectionInfo info)
{
this.Provider = provider;
this.transport = transport;
@@ -62,13 +62,13 @@
public string QueuePrefix => Info.QueuePrefix;
public string TopicPrefix => Info.TopicPrefix;
public bool ObjectMessageUsesAmqpTypes { get; set; } = false;
- public ConnectionInfo Info { get; }
+ public NmsConnectionInfo Info { get; }
public INmsMessageFactory MessageFactory => messageFactory;
internal async Task Start()
{
- Address address = UriUtil.ToAddress(remoteUri, Info.username, Info.password);
+ Address address = UriUtil.ToAddress(remoteUri, Info.UserName, Info.Password);
this.tsc = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
underlyingConnection = await transport.CreateAsync(address, new AmqpHandler(this)).ConfigureAwait(false);
underlyingConnection.AddClosedCallback(OnClosed);
@@ -78,8 +78,7 @@
// Create a Session for this connection that is used for Temporary Destinations
// and perhaps later on management and advisory monitoring.
- // TODO: change the way how connection session id is obtained
- SessionInfo sessionInfo = new SessionInfo(Info.Id);
+ NmsSessionInfo sessionInfo = new NmsSessionInfo(Info, -1);
sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
connectionSession = new AmqpConnectionSession(this, sessionInfo);
@@ -107,10 +106,10 @@
internal void OnLocalOpen(Open open)
{
open.ContainerId = Info.ClientId;
- open.ChannelMax = Info.channelMax;
- open.MaxFrameSize = Convert.ToUInt32(Info.maxFrameSize);
+ open.ChannelMax = Info.ChannelMax;
+ open.MaxFrameSize = (uint) Info.MaxFrameSize;
open.HostName = remoteUri.Host;
- open.IdleTimeOut = Convert.ToUInt32(Info.idleTimout);
+ open.IdleTimeOut = (uint) Info.IdleTimeOut;
open.DesiredCapabilities = new[]
{
SymbolUtil.OPEN_CAPABILITY_SOLE_CONNECTION_FOR_CONTAINER,
@@ -145,7 +144,7 @@
}
}
- public async Task CreateSession(SessionInfo sessionInfo)
+ public async Task CreateSession(NmsSessionInfo sessionInfo)
{
var amqpSession = new AmqpSession(this, sessionInfo);
await amqpSession.Start().ConfigureAwait(false);
@@ -166,7 +165,7 @@
}
}
- public AmqpSession GetSession(Id sessionId)
+ public AmqpSession GetSession(NmsSessionId sessionId)
{
if (sessions.TryGetValue(sessionId, out AmqpSession session))
{
@@ -175,24 +174,24 @@
throw new InvalidOperationException($"Amqp Session {sessionId} doesn't exist and cannot be retrieved.");
}
- public void RemoveSession(Id sessionId)
+ public void RemoveSession(NmsSessionId sessionId)
{
- sessions.TryRemove(sessionId, out AmqpSession removedSession);
+ sessions.TryRemove(sessionId, out AmqpSession _);
}
public async Task CreateTemporaryDestination(NmsTemporaryDestination destination)
{
AmqpTemporaryDestination amqpTemporaryDestination = new AmqpTemporaryDestination(connectionSession, destination);
await amqpTemporaryDestination.Attach();
- temporaryDestinations.TryAdd(destination.Id, amqpTemporaryDestination);
+ temporaryDestinations.TryAdd(destination, amqpTemporaryDestination);
}
public AmqpTemporaryDestination GetTemporaryDestination(NmsTemporaryDestination destination)
{
- return temporaryDestinations.TryGetValue(destination.Id, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
+ return temporaryDestinations.TryGetValue(destination, out AmqpTemporaryDestination amqpTemporaryDestination) ? amqpTemporaryDestination : null;
}
- public void RemoveTemporaryDestination(Id destinationId)
+ public void RemoveTemporaryDestination(NmsTemporaryDestination destinationId)
{
temporaryDestinations.TryRemove(destinationId, out _);
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConnectionSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConnectionSession.cs
index 859f43d..c89133a 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConnectionSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConnectionSession.cs
@@ -16,7 +16,6 @@
*/
using System;
-using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
@@ -30,7 +29,7 @@
{
private readonly bool hasClientId;
- public AmqpConnectionSession(AmqpConnection connection, SessionInfo sessionInfo) : base(connection, sessionInfo)
+ public AmqpConnectionSession(AmqpConnection connection, NmsSessionInfo sessionInfo) : base(connection, sessionInfo)
{
this.hasClientId = connection.Info.IsExplicitClientId;
}
@@ -59,8 +58,8 @@
});
await tcs.Task;
-
- receiverLink.Close(TimeSpan.FromMilliseconds(SessionInfo.closeTimeout));
+
+ receiverLink.Close(TimeSpan.FromMilliseconds(Connection.Provider.CloseTimeout));
}
private Attach CreateAttach(string subscriptionName)
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
index 685d9a3..7ecf7e7 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpConsumer.cs
@@ -21,7 +21,6 @@
using System.Threading.Tasks;
using Amqp;
using Amqp.Framing;
-using Amqp.Transactions;
using Amqp.Types;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
@@ -39,7 +38,7 @@
public class AmqpConsumer : IAmqpConsumer
{
- private readonly ConsumerInfo info;
+ private readonly NmsConsumerInfo info;
private ReceiverLink receiverLink;
private readonly LinkedList<InboundMessageDispatch> messages;
private readonly object syncRoot = new object();
@@ -48,7 +47,7 @@
public IDestination Destination => info.Destination;
public IAmqpConnection Connection => session.Connection;
- public AmqpConsumer(AmqpSession amqpSession, ConsumerInfo info)
+ public AmqpConsumer(AmqpSession amqpSession, NmsConsumerInfo info)
{
session = amqpSession;
this.info = info;
@@ -56,7 +55,7 @@
messages = new LinkedList<InboundMessageDispatch>();
}
- public Id ConsumerId => this.info.Id;
+ public NmsConsumerId ConsumerId => this.info.Id;
public Task Attach()
{
@@ -158,7 +157,7 @@
filters.Add(SymbolUtil.ATTACH_FILTER_NO_LOCAL, AmqpNmsNoLocalType.NO_LOCAL);
}
- if (info.HasSelector)
+ if (info.HasSelector())
{
filters.Add(SymbolUtil.ATTACH_FILTER_SELECTOR, new AmqpNmsSelectorType(info.Selector));
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
index 3eb8e01..4ccbdae 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProducer.cs
@@ -30,10 +30,10 @@
public class AmqpProducer
{
private readonly AmqpSession session;
- private readonly ProducerInfo info;
+ private readonly NmsProducerInfo info;
private SenderLink senderLink;
- public AmqpProducer(AmqpSession session, ProducerInfo info)
+ public AmqpProducer(AmqpSession session, NmsProducerInfo info)
{
this.session = session;
this.info = info;
@@ -92,7 +92,6 @@
private Source CreateSource() => new Source
{
Address = info.Id.ToString(),
- Timeout = (uint) info.sendTimeout,
Outcomes = new[]
{
SymbolUtil.ATTACH_OUTCOME_ACCEPTED,
@@ -105,8 +104,6 @@
Target target = new Target();
target.Address = AmqpDestinationHelper.GetDestinationAddress(info.Destination, session.Connection);
- target.Timeout = (uint) info.sendTimeout;
-
// Durable is used for a durable subscription
target.Durable = (uint) TerminusDurability.NONE;
@@ -200,7 +197,8 @@
{
try
{
- senderLink.Close(TimeSpan.FromMilliseconds(info.closeTimeout));
+ var closeTimeout = session.Connection.Provider.CloseTimeout;
+ senderLink.Close(TimeSpan.FromMilliseconds(closeTimeout));
}
catch (NMSException)
{
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
index 80e2fdd..e908fcd 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpProvider.cs
@@ -29,12 +29,11 @@
public class AmqpProvider : IProvider
{
public static readonly uint DEFAULT_MAX_HANDLE = 1024;
+ private static readonly uint DEFAULT_SESSION_OUTGOING_WINDOW = 2048; // AmqpNetLite default
private readonly ITransportContext transport;
- private ConnectionInfo connectionInfo;
+ private NmsConnectionInfo connectionInfo;
private AmqpConnection connection;
- public uint MaxHandle { get; set; } = DEFAULT_MAX_HANDLE;
-
public AmqpProvider(Uri remoteUri, ITransportContext transport)
{
RemoteUri = remoteUri;
@@ -101,7 +100,12 @@
}
}
- public long SendTimeout => connectionInfo?.SendTimeout ?? ConnectionInfo.DEFAULT_SEND_TIMEOUT;
+ public long SendTimeout => connectionInfo?.SendTimeout ?? NmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+ public long CloseTimeout => connectionInfo?.CloseTimeout ?? NmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+ public long RequestTimeout => connectionInfo?.RequestTimeout ?? NmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+ public uint SessionOutgoingWindow { get; set; } = DEFAULT_SESSION_OUTGOING_WINDOW;
+ public uint MaxHandle { get; set; } = DEFAULT_MAX_HANDLE;
+
public Uri RemoteUri { get; }
public IProviderListener Listener { get; private set; }
@@ -109,7 +113,7 @@
{
}
- public Task Connect(ConnectionInfo connectionInfo)
+ public Task Connect(NmsConnectionInfo connectionInfo)
{
this.connectionInfo = connectionInfo;
connection = new AmqpConnection(this, transport, connectionInfo);
@@ -136,25 +140,25 @@
Listener = providerListener;
}
- public Task CreateResource(ResourceInfo resourceInfo)
+ public Task CreateResource(INmsResource resourceInfo)
{
switch (resourceInfo)
{
- case SessionInfo sessionInfo:
+ case NmsSessionInfo sessionInfo:
return connection.CreateSession(sessionInfo);
- case ConsumerInfo consumerInfo:
+ case NmsConsumerInfo consumerInfo:
{
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
return session.CreateConsumer(consumerInfo);
}
- case ProducerInfo producerInfo:
+ case NmsProducerInfo producerInfo:
{
AmqpSession session = connection.GetSession(producerInfo.SessionId);
return session.CreateProducer(producerInfo);
}
case NmsTemporaryDestination temporaryDestination:
return connection.CreateTemporaryDestination(temporaryDestination);
- case TransactionInfo transactionInfo:
+ case NmsTransactionInfo transactionInfo:
var amqpSession = connection.GetSession(transactionInfo.SessionId);
return amqpSession.BeginTransaction(transactionInfo);
default:
@@ -162,17 +166,17 @@
}
}
- public Task DestroyResource(ResourceInfo resourceInfo)
+ public Task DestroyResource(INmsResource resourceInfo)
{
switch (resourceInfo)
{
- case SessionInfo sessionInfo:
+ case NmsSessionInfo sessionInfo:
{
AmqpSession session = connection.GetSession(sessionInfo.Id);
session.Close();
return Task.CompletedTask;
}
- case ConsumerInfo consumerInfo:
+ case NmsConsumerInfo consumerInfo:
{
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer consumer = session.GetConsumer(consumerInfo.Id);
@@ -180,7 +184,7 @@
session.RemoveConsumer(consumerInfo.Id);
return Task.CompletedTask;
}
- case ProducerInfo producerInfo:
+ case NmsProducerInfo producerInfo:
{
AmqpSession session = connection.GetSession(producerInfo.SessionId);
AmqpProducer producer = session.GetProducer(producerInfo.Id);
@@ -194,10 +198,10 @@
if (amqpTemporaryDestination != null)
{
amqpTemporaryDestination.Close();
- connection.RemoveTemporaryDestination(temporaryDestination.Id);
+ connection.RemoveTemporaryDestination(temporaryDestination);
}
else
- Tracer.Debug($"Could not find temporary destination {temporaryDestination.Id} to delete.");
+ Tracer.Debug($"Could not find temporary destination {temporaryDestination} to delete.");
return Task.CompletedTask;
}
@@ -206,11 +210,11 @@
}
}
- public Task StartResource(ResourceInfo resourceInfo)
+ public Task StartResource(INmsResource resourceInfo)
{
switch (resourceInfo)
{
- case ConsumerInfo consumerInfo:
+ case NmsConsumerInfo consumerInfo:
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer amqpConsumer = session.GetConsumer(consumerInfo.Id);
amqpConsumer.Start();
@@ -220,11 +224,11 @@
}
}
- public Task StopResource(ResourceInfo resourceInfo)
+ public Task StopResource(INmsResource resourceInfo)
{
switch (resourceInfo)
{
- case ConsumerInfo consumerInfo:
+ case NmsConsumerInfo consumerInfo:
AmqpSession session = connection.GetSession(consumerInfo.SessionId);
AmqpConsumer amqpConsumer = session.GetConsumer(consumerInfo.Id);
amqpConsumer.Stop();
@@ -234,14 +238,14 @@
}
}
- public Task Recover(Id sessionId)
+ public Task Recover(NmsSessionId sessionId)
{
AmqpSession session = connection.GetSession(sessionId);
session.Recover();
return Task.CompletedTask;
}
- public Task Acknowledge(Id sessionId, AckType ackType)
+ public Task Acknowledge(NmsSessionId sessionId, AckType ackType)
{
AmqpSession session = connection.GetSession(sessionId);
foreach (AmqpConsumer consumer in session.Consumers)
@@ -276,19 +280,19 @@
return connection.Unsubscribe(subscriptionName);
}
- public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
var session = connection.GetSession(transactionInfo.SessionId);
return session.Rollback(transactionInfo, nextTransactionInfo);
}
- public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
var session = connection.GetSession(transactionInfo.SessionId);
return session.Commit(transactionInfo, nextTransactionInfo);
}
- public void FireResourceClosed(ResourceInfo resourceInfo, Exception error)
+ public void FireResourceClosed(INmsResource resourceInfo, Exception error)
{
Listener.OnResourceClosed(resourceInfo, error);
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
index f090edd..d53c0fc 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSendTask.cs
@@ -31,7 +31,7 @@
public AmqpSendTask(SenderLink link, global::Amqp.Message message, DeliveryState deliveryState, long timeoutMillis)
{
- if (timeoutMillis != ConnectionInfo.INFINITE)
+ if (timeoutMillis != NmsConnectionInfo.INFINITE)
{
this.timer = new Timer(OnTimer, this, timeoutMillis, -1);
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
index 2fb8ecb..d2c5c81 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpSession.cs
@@ -30,11 +30,11 @@
{
public class AmqpSession
{
- private readonly ConcurrentDictionary<Id, AmqpConsumer> consumers = new ConcurrentDictionary<Id, AmqpConsumer>();
- private readonly ConcurrentDictionary<Id, AmqpProducer> producers = new ConcurrentDictionary<Id, AmqpProducer>();
- protected readonly SessionInfo SessionInfo;
+ private readonly ConcurrentDictionary<NmsConsumerId, AmqpConsumer> consumers = new ConcurrentDictionary<NmsConsumerId, AmqpConsumer>();
+ private readonly ConcurrentDictionary<NmsProducerId, AmqpProducer> producers = new ConcurrentDictionary<NmsProducerId, AmqpProducer>();
+ protected readonly NmsSessionInfo SessionInfo;
- public AmqpSession(AmqpConnection connection, SessionInfo sessionInfo)
+ public AmqpSession(AmqpConnection connection, NmsSessionInfo sessionInfo)
{
Connection = connection;
SessionInfo = sessionInfo;
@@ -51,7 +51,7 @@
public Session UnderlyingSession { get; private set; }
public IEnumerable<AmqpConsumer> Consumers => consumers.Values.ToArray();
- public Id SessionId => SessionInfo.Id;
+ public NmsSessionId SessionId => SessionInfo.Id;
internal bool IsTransacted => SessionInfo.IsTransacted;
@@ -61,16 +61,16 @@
{
TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
- if (SessionInfo.requestTimeout > 0)
+ var requestTimeout = Connection.Provider.RequestTimeout;
+ if (requestTimeout > 0)
{
- CancellationTokenSource ct = new CancellationTokenSource(TimeSpan.FromMilliseconds(SessionInfo.requestTimeout));
+ CancellationTokenSource ct = new CancellationTokenSource(TimeSpan.FromMilliseconds(requestTimeout));
ct.Token.Register(() => tcs.TrySetCanceled(), false);
}
UnderlyingSession = new Session(Connection.UnderlyingConnection, CreateBeginFrame(),
(session, begin) =>
{
- SessionInfo.remoteChannel = begin.RemoteChannel;
tcs.TrySetResult(true);
});
UnderlyingSession.AddClosedCallback((sender, error) =>
@@ -85,12 +85,13 @@
public void Close()
{
- TimeSpan timeout = TimeSpan.FromMilliseconds(SessionInfo.closeTimeout);
+ long closeTimeout = Connection.Provider.CloseTimeout;
+ TimeSpan timeout = TimeSpan.FromMilliseconds(closeTimeout);
UnderlyingSession.Close(timeout);
Connection.RemoveSession(SessionInfo.Id);
}
- public Task BeginTransaction(TransactionInfo transactionInfo)
+ public Task BeginTransaction(NmsTransactionInfo transactionInfo)
{
if (!SessionInfo.IsTransacted)
{
@@ -105,27 +106,27 @@
return new Begin
{
HandleMax = Connection.Provider.MaxHandle,
- IncomingWindow = SessionInfo.incomingWindow,
- OutgoingWindow = SessionInfo.outgoingWindow,
- NextOutgoingId = SessionInfo.nextOutgoingId
+ IncomingWindow = int.MaxValue, // value taken from qpid-jms
+ OutgoingWindow = Connection.Provider.SessionOutgoingWindow,
+ NextOutgoingId = 1
};
}
- public async Task CreateConsumer(ConsumerInfo consumerInfo)
+ public async Task CreateConsumer(NmsConsumerInfo consumerInfo)
{
AmqpConsumer amqpConsumer = new AmqpConsumer(this, consumerInfo);
await amqpConsumer.Attach();
consumers.TryAdd(consumerInfo.Id, amqpConsumer);
}
- public async Task CreateProducer(ProducerInfo producerInfo)
+ public async Task CreateProducer(NmsProducerInfo producerInfo)
{
var amqpProducer = new AmqpProducer(this, producerInfo);
await amqpProducer.Attach();
producers.TryAdd(producerInfo.Id, amqpProducer);
}
- public AmqpConsumer GetConsumer(Id consumerId)
+ public AmqpConsumer GetConsumer(NmsConsumerId consumerId)
{
if (consumers.TryGetValue(consumerId, out var consumer))
{
@@ -135,7 +136,7 @@
throw new Exception();
}
- public AmqpProducer GetProducer(Id producerId)
+ public AmqpProducer GetProducer(NmsProducerId producerId)
{
if (producers.TryGetValue(producerId, out var producer))
{
@@ -145,12 +146,12 @@
throw new Exception();
}
- public void RemoveConsumer(Id consumerId)
+ public void RemoveConsumer(NmsConsumerId consumerId)
{
consumers.TryRemove(consumerId, out _);
}
- public void RemoveProducer(Id producerId)
+ public void RemoveProducer(NmsProducerId producerId)
{
producers.TryRemove(producerId, out _);
}
@@ -178,7 +179,7 @@
/// <param name="transactionInfo">The TransactionInfo describing the transaction being rolled back.</param>
/// <param name="nextTransactionInfo">The TransactionInfo describing the transaction that should be started immediately.</param>
/// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
- public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
if (!SessionInfo.IsTransacted)
{
@@ -194,7 +195,7 @@
/// <param name="transactionInfo">the TransactionInfo describing the transaction being committed.</param>
/// <param name="nextTransactionInfo">the TransactionInfo describing the transaction that should be started immediately.</param>
/// <exception cref="Exception">throws Exception if an error occurs while performing the operation.</exception>
- public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
if (!SessionInfo.IsTransacted)
{
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTemporaryDestination.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTemporaryDestination.cs
index c12cd09..6a76632 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTemporaryDestination.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTemporaryDestination.cs
@@ -49,7 +49,7 @@
RcvSettleMode = ReceiverSettleMode.First,
};
- string linkDestinationName = "apache-nms:" + ((destination.IsTopic) ? CREATOR_TOPIC : CREATOR_QUEUE) + destination.Id;
+ string linkDestinationName = "apache-nms:" + ((destination.IsTopic) ? CREATOR_TOPIC : CREATOR_QUEUE) + destination.Address;
var taskCompletionSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
senderLink = new SenderLink(session.UnderlyingSession, linkDestinationName, result, (link, attach) =>
{
@@ -68,7 +68,7 @@
senderLink.AddClosedCallback((sender, error) =>
{
- NMSException exception = ExceptionSupport.GetException(error, $"Received attach response for Temporary creator link. Link = {destination.Id}");
+ NMSException exception = ExceptionSupport.GetException(error, $"Received attach response for Temporary creator link. Link = {destination}");
taskCompletionSource.TrySetException(exception);
});
return taskCompletionSource.Task;
@@ -107,7 +107,7 @@
}
catch (Exception ex)
{
- throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", destination.Id);
+ throw ExceptionSupport.Wrap(ex, "Failed to close Link {0}", destination);
}
}
}
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
index 6753e4e..0a77f4a 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionContext.cs
@@ -28,11 +28,11 @@
public class AmqpTransactionContext
{
private readonly AmqpSession session;
- private readonly Dictionary<Id, AmqpConsumer> txConsumers = new Dictionary<Id, AmqpConsumer>();
+ private readonly Dictionary<NmsConsumerId, AmqpConsumer> txConsumers = new Dictionary<NmsConsumerId, AmqpConsumer>();
private TransactionalState cachedAcceptedState;
private TransactionalState cachedTransactedState;
private AmqpTransactionCoordinator coordinator;
- private Id current;
+ private NmsTransactionId current;
private byte[] txnId;
public AmqpTransactionContext(AmqpSession session)
@@ -52,7 +52,7 @@
return this.cachedAcceptedState;
}
- public async Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public async Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
if (!Equals(transactionInfo.Id, this.current))
{
@@ -87,7 +87,7 @@
this.txConsumers.Clear();
}
- public async Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public async Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
if (!Equals(transactionInfo.Id, this.current))
{
@@ -113,7 +113,7 @@
this.txConsumers.Clear();
}
- public async Task Begin(TransactionInfo transactionInfo)
+ public async Task Begin(NmsTransactionInfo transactionInfo)
{
if (this.current != null)
throw new NMSException("Begin called while a TX is still Active.");
diff --git a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
index d26e8fb..3fab595 100644
--- a/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
+++ b/src/NMS.AMQP/Provider/Amqp/AmqpTransactionCoordinator.cs
@@ -51,7 +51,7 @@
public async Task<byte[]> DeclareAsync()
{
- var outcome = await this.SendAsync(DeclareMessage, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
+ var outcome = await this.SendAsync(DeclareMessage, null, this.session.Connection.Provider.RequestTimeout).ConfigureAwait(false);
if (outcome.Descriptor.Code == MessageSupport.DECLARED_INSTANCE.Descriptor.Code)
{
return ((Declared) outcome).TxnId;
@@ -71,7 +71,7 @@
public async Task DischargeAsync(byte[] txnId, bool fail)
{
var message = new global::Amqp.Message(new Discharge { TxnId = txnId, Fail = fail });
- var outcome = await this.SendAsync(message, null, this.session.Connection.Info.requestTimeout).ConfigureAwait(false);
+ var outcome = await this.SendAsync(message, null, this.session.Connection.Provider.RequestTimeout).ConfigureAwait(false);
if (outcome.Descriptor.Code == MessageSupport.ACCEPTED_INSTANCE.Descriptor.Code)
{
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
index e61ddfb..4185271 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsBytesMessageFacade.cs
@@ -163,7 +163,7 @@
Message.BodySection = EMPTY_DATA;
}
- public override bool HasBody()
+ public virtual bool HasBody()
{
if (byteOut != null)
return byteOut.BaseStream.Length > 0;
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
index ae09084..7a588de 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMapMessageFacade.cs
@@ -67,7 +67,7 @@
}
}
- public override bool HasBody()
+ public virtual bool HasBody()
{
return Map.Count > 0;
}
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
index 183f215..a6f53a7 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsMessageFacade.cs
@@ -39,11 +39,6 @@
private DateTime? syntheticExpiration;
public global::Amqp.Message Message { get; private set; }
- public virtual bool HasBody()
- {
- return Message.BodySection != null;
- }
-
public int RedeliveryCount
{
get => Convert.ToInt32(Header.DeliveryCount);
@@ -88,6 +83,23 @@
}
}
+ public object ProviderMessageIdObject
+ {
+ get => Message.Properties?.GetMessageId();
+ set
+ {
+ if (Message.Properties == null)
+ {
+ if (value == null)
+ {
+ return;
+ }
+ LazyCreateProperties();
+ }
+ Message.Properties?.SetMessageId(value);
+ }
+ }
+
public string NMSCorrelationID
{
get => Message.Properties == null ? null : AmqpMessageIdHelper.ToCorrelationIdString(Message.Properties.GetCorrelationId());
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
index 18fc051..a7f63c9 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsStreamMessageFacade.cs
@@ -138,7 +138,7 @@
return emptyList;
}
- public override bool HasBody() => !IsEmpty;
+ public virtual bool HasBody() => !IsEmpty;
public override void ClearBody()
{
diff --git a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
index dd3cf81..371efcd 100644
--- a/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
+++ b/src/NMS.AMQP/Provider/Amqp/Message/AmqpNmsTextMessageFacade.cs
@@ -71,7 +71,7 @@
SetTextBody(null);
}
- public override bool HasBody()
+ public virtual bool HasBody()
{
try
{
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
index 89ce95f..c567687 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverProvider.cs
@@ -52,7 +52,7 @@
private long sendTimeout;
private Uri connectedUri;
- private ConnectionInfo connectionInfo;
+ private NmsConnectionInfo connectionInfo;
private IProvider provider;
private IProviderListener listener;
@@ -86,11 +86,11 @@
}
}
- public Task Connect(ConnectionInfo info)
+ public Task Connect(NmsConnectionInfo info)
{
CheckClosed();
- requestTimeout = info.requestTimeout;
+ requestTimeout = info.RequestTimeout;
sendTimeout = info.SendTimeout;
connectionInfo = info;
@@ -273,7 +273,7 @@
listener = providerListener;
}
- public Task CreateResource(ResourceInfo resourceInfo)
+ public Task CreateResource(INmsResource resourceInfo)
{
CheckClosed();
@@ -288,7 +288,7 @@
return request.Task;
}
- public Task DestroyResource(ResourceInfo resourceInfo)
+ public Task DestroyResource(INmsResource resourceInfo)
{
CheckClosed();
@@ -306,7 +306,7 @@
return request.Task;
}
- public Task StartResource(ResourceInfo resourceInfo)
+ public Task StartResource(INmsResource resourceInfo)
{
CheckClosed();
@@ -321,7 +321,7 @@
return request.Task;
}
- public Task StopResource(ResourceInfo resourceInfo)
+ public Task StopResource(INmsResource resourceInfo)
{
CheckClosed();
@@ -336,7 +336,7 @@
return request.Task;
}
- public Task Recover(Id sessionId)
+ public Task Recover(NmsSessionId sessionId)
{
CheckClosed();
@@ -352,7 +352,7 @@
return request.Task;
}
- public Task Acknowledge(Id sessionId, AckType ackType)
+ public Task Acknowledge(NmsSessionId sessionId, AckType ackType)
{
CheckClosed();
@@ -418,7 +418,7 @@
return request.Task;
}
- public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
CheckClosed();
@@ -434,7 +434,7 @@
return request.Task;
}
- public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
CheckClosed();
@@ -486,7 +486,7 @@
{
}
- public void OnResourceClosed(ResourceInfo resourceInfo, Exception error)
+ public void OnResourceClosed(INmsResource resourceInfo, Exception error)
{
if (closed)
return;
diff --git a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
index b6a5684..6c2ef6f 100644
--- a/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
+++ b/src/NMS.AMQP/Provider/Failover/FailoverRequest.cs
@@ -86,7 +86,7 @@
public void ScheduleTimeout()
{
- if (cancellationTokenSource == null && requestTimeout != ConnectionInfo.INFINITE)
+ if (cancellationTokenSource == null && requestTimeout != NmsConnectionInfo.INFINITE)
{
TimeSpan timeout = TimeSpan.FromMilliseconds(requestTimeout) - (DateTime.UtcNow - requestStarted);
if (timeout > TimeSpan.Zero)
diff --git a/src/NMS.AMQP/Provider/IProvider.cs b/src/NMS.AMQP/Provider/IProvider.cs
index 39acaaa..afe3402 100644
--- a/src/NMS.AMQP/Provider/IProvider.cs
+++ b/src/NMS.AMQP/Provider/IProvider.cs
@@ -28,20 +28,20 @@
long SendTimeout { get; }
Uri RemoteUri { get; }
void Start();
- Task Connect(ConnectionInfo connectionInfo);
+ Task Connect(NmsConnectionInfo connectionInfo);
void Close();
void SetProviderListener(IProviderListener providerListener);
- Task CreateResource(ResourceInfo resourceInfo);
- Task DestroyResource(ResourceInfo resourceInfo);
- Task StartResource(ResourceInfo resourceInfo);
- Task StopResource(ResourceInfo resourceInfo);
- Task Recover(Id sessionId);
- Task Acknowledge(Id sessionId, AckType ackType);
+ Task CreateResource(INmsResource resourceInfo);
+ Task DestroyResource(INmsResource resourceInfo);
+ Task StartResource(INmsResource resourceInfo);
+ Task StopResource(INmsResource resourceInfo);
+ Task Recover(NmsSessionId sessionId);
+ Task Acknowledge(NmsSessionId sessionId, AckType ackType);
Task Acknowledge(InboundMessageDispatch envelope, AckType ackType);
INmsMessageFactory MessageFactory { get; }
Task Send(OutboundMessageDispatch envelope);
Task Unsubscribe(string name);
- Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo);
- Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo);
+ Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo);
+ Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo);
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Provider/IProviderListener.cs b/src/NMS.AMQP/Provider/IProviderListener.cs
index 3d66622..3ebc976 100644
--- a/src/NMS.AMQP/Provider/IProviderListener.cs
+++ b/src/NMS.AMQP/Provider/IProviderListener.cs
@@ -58,7 +58,7 @@
/// <param name="remoteUri">The Uri of the Broker that the client has now connected to.</param>
void OnConnectionRestored(Uri remoteUri);
- void OnResourceClosed(ResourceInfo resourceInfo, Exception error);
+ void OnResourceClosed(INmsResource resource, Exception error);
/// <summary>
/// Called from a fault tolerant Provider instance to signal that the underlying
diff --git a/src/NMS.AMQP/Util/AmqpDestinationHelper.cs b/src/NMS.AMQP/Util/AmqpDestinationHelper.cs
index 63c2218..117da2e 100644
--- a/src/NMS.AMQP/Util/AmqpDestinationHelper.cs
+++ b/src/NMS.AMQP/Util/AmqpDestinationHelper.cs
@@ -159,11 +159,11 @@
case MessageSupport.JMS_DEST_TYPE_TOPIC:
return new NmsTopic(address);
case MessageSupport.JMS_DEST_TYPE_TEMP_QUEUE:
- NmsTemporaryQueue temporaryQueue = new NmsTemporaryQueue(new CustomIdGenerator(true, address).GenerateId());
+ NmsTemporaryQueue temporaryQueue = new NmsTemporaryQueue(address);
temporaryQueue.Address = address;
return temporaryQueue;
case MessageSupport.JMS_DEST_TYPE_TEMP_TOPIC:
- NmsTemporaryTopic temporaryTopic = new NmsTemporaryTopic(new CustomIdGenerator(true, address).GenerateId());
+ NmsTemporaryTopic temporaryTopic = new NmsTemporaryTopic(address);
temporaryTopic.Address = address;
return temporaryTopic;
}
@@ -182,14 +182,14 @@
if (consumerDestination.IsQueue)
{
if (consumerDestination.IsTemporary)
- return new NmsTemporaryQueue(new CustomIdGenerator(true, address).GenerateId()) { Address = address };
+ return new NmsTemporaryQueue(address);
else
return new NmsQueue(address);
}
else if (consumerDestination.IsTopic)
{
if (consumerDestination.IsTemporary)
- return new NmsTemporaryTopic(new CustomIdGenerator(true, address).GenerateId()) { Address = address };
+ return new NmsTemporaryTopic(address);
else
return new NmsTopic(address);
}
diff --git a/src/NMS.AMQP/Util/AtomicBool.cs b/src/NMS.AMQP/Util/AtomicBool.cs
index 8fef0d4..e138728 100644
--- a/src/NMS.AMQP/Util/AtomicBool.cs
+++ b/src/NMS.AMQP/Util/AtomicBool.cs
@@ -19,7 +19,7 @@
namespace Apache.NMS.AMQP.Util
{
- public class AtomicBool
+ internal class AtomicBool
{
private int value;
diff --git a/src/NMS.AMQP/Meta/ResourceInfo.cs b/src/NMS.AMQP/Util/AtomicLong.cs
similarity index 65%
copy from src/NMS.AMQP/Meta/ResourceInfo.cs
copy to src/NMS.AMQP/Util/AtomicLong.cs
index e89b9f6..d43fae2 100644
--- a/src/NMS.AMQP/Meta/ResourceInfo.cs
+++ b/src/NMS.AMQP/Util/AtomicLong.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,20 +15,27 @@
* limitations under the License.
*/
-using Apache.NMS.AMQP.Util;
+using System.Threading;
-namespace Apache.NMS.AMQP.Meta
+namespace Apache.NMS.AMQP.Util
{
- public abstract class ResourceInfo
+ internal class AtomicLong
{
+ private long value;
- private readonly Id resourceId;
-
- protected ResourceInfo(Id resourceId)
+ public AtomicLong(long initialValue = 0)
{
- this.resourceId = resourceId;
+ this.value = initialValue;
}
- public virtual Id Id { get { return resourceId; } }
+ public long IncrementAndGet()
+ {
+ return Interlocked.Increment(ref value);
+ }
+
+ public static implicit operator long(AtomicLong atomicLong)
+ {
+ return atomicLong.value;
+ }
}
}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Util/AtomicSequence.cs b/src/NMS.AMQP/Util/AtomicSequence.cs
deleted file mode 100644
index 20b1c1b..0000000
--- a/src/NMS.AMQP/Util/AtomicSequence.cs
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-using System;
-using System.Collections.Generic;
-using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS.Util;
-
-namespace Apache.NMS.AMQP.Util
-{
- /// <summary>
- /// Simple utility class used mainly for Id generation.
- /// </summary>
- public class AtomicSequence : Atomic<ulong>
- {
- public AtomicSequence() : base()
- {
- }
-
- public AtomicSequence(ulong defaultValue) : base(defaultValue)
- {
- }
-
- public ulong getAndIncrement()
- {
- ulong val = 0;
- lock (this)
- {
- val = atomicValue;
- atomicValue++;
- }
- return val;
- }
-
- public override string ToString()
- {
- return Value.ToString();
- }
- }
-}
diff --git a/src/NMS.AMQP/Util/IdGenerator.cs b/src/NMS.AMQP/Util/IdGenerator.cs
index f3d1c7a..6a259a2 100644
--- a/src/NMS.AMQP/Util/IdGenerator.cs
+++ b/src/NMS.AMQP/Util/IdGenerator.cs
@@ -14,660 +14,55 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
-using System.Net;
-using System.Security.Permissions;
using System.Collections.Generic;
using System.Linq;
-using System.Text;
-using System.Threading.Tasks;
-using Apache.NMS.Util;
-using Apache.NMS;
+using System.Net;
+using System.Threading;
namespace Apache.NMS.AMQP.Util
{
- #region Id Class
-
- public class Id : IComparable
+ internal class IdGenerator
{
- public static readonly Id EMPTY = new Id();
- protected const int DEFAULT_MAX_CAPACITY = 1;
- protected static readonly int[] HashTable = new int[] { 2,3,5,7,11,13,17,19,23,29,31,37,41,43,47,53,59,61,67,71,73,79,83,89,97};
- protected static readonly int HashTableSize = HashTable.Length;
- protected delegate ComponentId InstanceFactory(object o);
- protected static readonly Dictionary<Type, InstanceFactory> buildMap;
+ public const string DEFAULT_PREFIX = "ID:";
- #region Class Initializer
-
- static Id()
- {
- buildMap = new Dictionary<Type, InstanceFactory>();
- buildMap.Add(typeof(String), (o) => { return new ComponentId<string>(o as string); });
- buildMap.Add(typeof(UInt16), (o) => { return new ComponentId<UInt16>(Convert.ToUInt16(o)); });
- buildMap.Add(typeof(UInt32), (o) => { return new ComponentId<UInt32>(Convert.ToUInt32(o)); });
- buildMap.Add(typeof(UInt64), (o) => { return new ComponentId<UInt64>(Convert.ToUInt64(o)); });
- buildMap.Add(typeof(Int16), (o) => { return new ComponentId<UInt16>(Convert.ToUInt16(o)); });
- buildMap.Add(typeof(Int32), (o) => { return new ComponentId<UInt32>(Convert.ToUInt32(o)); });
- buildMap.Add(typeof(Int64), (o) => { return new ComponentId<UInt64>(Convert.ToUInt64(o)); });
- buildMap.Add(typeof(AtomicSequence), (o) =>
- {
- ulong val = 0;
- AtomicSequence seq = o as AtomicSequence;
- if (o != null && seq != null)
- {
- val = seq.getAndIncrement();
- }
- return new ComponentId<UInt64>(val);
- });
- buildMap.Add(typeof(Guid), (o) =>
- {
- Guid id;
- if (o == null)
- {
- id = Guid.Empty;
- }
- else
- {
- id = (Guid)o;
- }
- return new ComponentId<Guid>(id);
- });
- buildMap.Add(typeof(Id), (o) => { return new ComponentId<Id>(o as Id); });
- }
-
- #endregion
-
- ComponentId[] components;
- private bool isReadOnly = false;
- private int length;
- private int maxCapacity;
- private int current;
- private string componentDelimeter;
- private int hashcode = 0;
-
- #region Constructors
-
- public Id(string delimeter, int size, int maxSize)
- {
- maxCapacity = Math.Max(maxSize, DEFAULT_MAX_CAPACITY);
- length = size;
- components = new ComponentId[length];
- current = 0;
- componentDelimeter = delimeter;
- }
-
- public Id(int size, int maxSize) : this(IdGenerator.ID_COMPONENT_DELIMETER, size, maxSize) { }
-
- public Id(int size) : this(IdGenerator.ID_COMPONENT_DELIMETER, size, size) { }
-
- public Id(params object[] args) : this(args.Length)
- {
- int added = this.AddAll(args);
- if(added > maxCapacity)
- {
- Tracer.ErrorFormat("Id Format error.");
- }
- Generate();
- }
-
- #endregion
-
- #region Public Properties
-
- public int Size
- {
- get { return length; }
- }
-
- #endregion
-
- #region Public Methods
-
- public void Add(object component)
- {
- if (isReadOnly)
- {
- throw new NMSException("Invalid Operation when generating Component Id. Can not change id once generated.");
- }
- if (current >= maxCapacity)
- {
- throw new NMSException("Invalid Operation when generating Component Id. Can not add component. Adding Compoenent at full capacity " + maxCapacity);
- }
- if (current >= length)
- {
- grow(length * 2);
- }
- Type type = component.GetType();
- InstanceFactory instf = null;
- buildMap.TryGetValue(type, out instf);
- if (instf == null)
- {
- throw new NMSException(string.Format("Invalid Id component type {0} for component {1}", type.ToString(), component.ToString()));
- }
- components[current] = instf(component);
- current++;
-
- }
-
- public object[] GetComponents(int startIndex=0)
- {
- return GetComponents(startIndex, length);
- }
-
- public object[] GetComponents(int startIndex, int endIndex)
- {
- int eIndex = Math.Max(0,Math.Min(endIndex, length));
- int sIndex = Math.Max(0, startIndex);
- int resultLen = eIndex - sIndex;
- if (resultLen<0)
- {
- return null;
- }
- object[] comps = new object[resultLen];
- int index = 0;
- for(int i=sIndex; i< eIndex; i++)
- {
- comps[index] = components[i].IdValue;
- index++;
- }
- return comps;
- }
-
- public object GetComponent(int index)
- {
- if (isReadOnly)
- {
- if (index >= 0 && index < length)
- {
- return components[index].IdValue;
- }
- }
- return null;
- }
-
- public object GetFirstComponent(Type type)
- {
- if (isReadOnly)
- {
- for (int i = 0; i < length; i++)
- {
- ComponentId cid = components[i];
- if (cid.ValueType.Equals(type))
- {
- return cid.IdValue;
- }
- }
- }
- return null;
- }
-
- public object GetLastComponent(Type type)
- {
- if (isReadOnly)
- {
- for (int i = length; i > 0; i--)
- {
- ComponentId cid = components[i - 1];
- if (cid.ValueType.Equals(type))
- {
- return cid.IdValue;
- }
- }
- }
- return null;
- }
-
- public void Generate()
- {
- if (!isReadOnly)
- {
- isReadOnly = true;
- this.GetHashCode();
- }
- }
-
- #endregion
-
- #region Object Override Methods
-
- public override bool Equals(object obj)
- {
- if(GetHashCode() == obj.GetHashCode())
- {
- return true;
- }
- else
- {
- return CompareTo(obj) == 0;
- }
- }
-
- public override int GetHashCode()
- {
- if(hashcode == 0 && isReadOnly && length > 0)
- {
- int hashIndex = 0;
- ComponentId cid = components[0];
- hashcode = HashTable[hashIndex] * cid.GetHashCode();
- for (int i=1; i<length; i++)
- {
- cid = components[i];
- hashIndex = i % HashTableSize;
- hashcode = hashcode ^ HashTable[hashIndex] * cid.GetHashCode();
- }
- }
- return hashcode;
- }
-
- public override string ToString()
- {
- if (maxCapacity == 0)
- {
- return "0";
- }
- if (isReadOnly)
- {
- if (length == 0)
- {
- return EMPTY.ToString();
- }
- StringBuilder sb = new StringBuilder();
- ComponentId cid = this.components[0];
- sb.Append(cid.ToString());
- for (int i = 1; i < length; i++)
- {
- cid = this.components[i];
- sb.Append(componentDelimeter);
- sb.Append(cid.ToString());
-
- }
- return sb.ToString();
- }
- else
- {
- return base.ToString();
- }
- }
-
- #endregion
-
- #region IComparable Methods
-
- public int CompareTo(object obj)
- {
-
- if(obj!=null && obj is Id)
- {
- return CompareTo(obj as Id);
- }
- else if(obj == null)
- {
- return 1;
- }
- else
- {
- return -1;
- }
-
- }
-
- public int CompareTo(Id that)
- {
- if(this.length > that.length)
- {
- return 1;
- }
- else if (this.length < that.length)
- {
- return -1;
- }
- else
- {
- int compare = 0;
- for(int i=0; i<length; i++)
- {
- ComponentId thisCid = this.components[i];
- ComponentId thatCid = that.components[i];
- compare = thisCid.CompareTo(thatCid);
- if ( compare > 0)
- {
- return 1;
- }
- else if ( compare < 0 )
- {
- return -1;
- }
-
- }
- return compare;
- }
- }
-
- #endregion
-
- #region Protected Methods
-
- protected void grow(int newCapacity)
- {
- int size = Math.Min(newCapacity, maxCapacity);
- ComponentId[] buffer = new ComponentId[size];
- Array.Copy(this.components, buffer, this.length);
- length = size;
- this.components = buffer;
- }
-
- protected int AddAll(params object[] args)
- {
-#if NET46
- if(Tracer.IsDebugEnabled)
- {
- Tracer.DebugFormat("Adding Id components: {0} MaxCapacity: {1}", string.Join(",",
- args.Select(x => x.ToString()).ToArray()), maxCapacity);
-
- }
-#else
-#endif
- int added = 0;
- foreach (object o in args)
- {
- Type type = o.GetType();
-
- if (type.IsArray && type.Equals(typeof(object[])))
- {
- object[] moreArgs = o as object[];
- int addlen = (moreArgs).Length;
-
- maxCapacity = maxCapacity + addlen - 1;
- added += this.AddAll(moreArgs);
-
- }
- else
- {
- this.Add(o);
- added++;
- }
- }
- return added;
- }
-
-#endregion
-
-#region Inner ComponentId Classes
-
- protected abstract class ComponentId : IComparable
- {
- protected object value;
-
- protected ComponentId(object idvalue)
- {
- value = idvalue;
- }
-
- public object IdValue { get { return value; } }
-
- public abstract Type ValueType { get; }
-
- public virtual int CompareTo(object obj)
- {
- if(obj == null)
- {
- return 1;
- }
- else if(obj is ComponentId)
- {
- return CompareTo(obj as ComponentId);
- }
- else if (obj is IComparable)
- {
- return -1 * (obj as IComparable).CompareTo(this.IdValue);
- }
- else
- {
- return -1;
- }
- }
-
- public virtual int CompareTo(ComponentId that)
- {
- if (this.ValueType.Equals(that.ValueType) || this.ValueType.IsEquivalentTo(that.ValueType))
- {
- if (this.IdValue.Equals(that.IdValue))
- {
- return 0;
- }
- else
- {
- return this.GetHashCode() - that.GetHashCode();
- }
- }
- else if (this.IdValue is IComparable)
- {
- return (this.IdValue as IComparable).CompareTo(that.IdValue);
- }
- else if (that.IdValue is IComparable)
- {
- return -1 * (that.IdValue as IComparable).CompareTo(this.IdValue);
- }
- else
- {
- return this.ValueType.GetHashCode() - that.ValueType.GetHashCode();
- }
-
- }
-
- public override bool Equals(object obj)
- {
- return CompareTo(obj) == 0;
- }
-
-
- public override int GetHashCode()
- {
- return this.IdValue.GetHashCode();
- }
-
- public override string ToString()
- {
- return value.ToString();
- }
- }
-
- protected class ComponentId<T> : ComponentId
- {
- public ComponentId(T val) : base(val)
- {
- }
-
- public T Value
- {
- get { return (T)this.value; }
- }
-
- public override Type ValueType
- {
- get
- {
- if (value != null)
- {
- return value.GetType();
- }
- return default(T).GetType();
- }
- }
- }
-
-#endregion
-
- }
-
-#endregion
-
-#region IdGenerator Class
-
- public class IdGenerator
- {
- public const string ID_COMPONENT_DELIMETER = ":";
- protected static readonly string DEFAULT_PREFIX = "ID";
- protected static string hostname = null;
-
- protected readonly string prefix;
- protected readonly AtomicSequence sequence = new AtomicSequence(1);
-
-#region Class Initializer
+ private readonly string prefix;
+ private long sequence;
+ private static string hostName;
static IdGenerator()
{
-#if NETSTANDARD2_0
- hostname = Dns.GetHostName();
-#else
- DnsPermission permissions = null;
try
{
- permissions = new DnsPermission(PermissionState.Unrestricted);
+ hostName = Dns.GetHostName();
}
catch (Exception e)
{
- Tracer.InfoFormat("{0}", e.StackTrace);
- }
- if (permissions != null)
- {
- hostname = Dns.GetHostName();
- }
-#endif
-
- }
-
-#endregion
-
-#region Constructors
-
- public IdGenerator(string prefix)
- {
- this.prefix = RemoveEnd(prefix, ID_COMPONENT_DELIMETER)
- + ((hostname == null)
- ?
- ""
- :
- ID_COMPONENT_DELIMETER + hostname)
- ;
- }
-
- public IdGenerator() : this(DEFAULT_PREFIX)
- {
- }
-
-#endregion
-
- public virtual Id GenerateId()
- {
- Id id = new Id(this.prefix, Guid.NewGuid(), sequence);
- return id;
- }
-
- public virtual string generateID()
- {
- Id id = GenerateId();
- return id.ToString();
- //return string.Format("{0}{1}" + ID_COMPONENT_DELIMETER + "{2}", this.prefix, Guid.NewGuid().ToString(), sequence.getAndIncrement());
- }
-
- protected static string RemoveEnd(string s, string end)
- {
- string result = s;
-
- if (s != null && end != null && s.EndsWith(end))
- {
- int sLen = s.Length;
- int endLen = end.Length;
- int newLen = sLen - endLen;
- if (endLen > 0 && newLen > 0)
- {
- StringBuilder sb = new StringBuilder(s, 0, newLen, newLen);
- result = sb.ToString();
- }
- }
- return result;
- }
- }
-
-#endregion
-
-#region Derivative IdGenerator Classes
-
- class NestedIdGenerator : IdGenerator
- {
- protected Id parentId;
- protected bool removeParentPrefix;
-
- public NestedIdGenerator(string prefix, Id pId, bool remove) : base(prefix)
- {
- parentId = pId;
- removeParentPrefix = remove;
- }
-
- public NestedIdGenerator(string prefix, Id pId) : this(prefix, pId, false) { }
-
- public NestedIdGenerator(Id pId):this(DEFAULT_PREFIX, pId) { }
-
- public override Id GenerateId()
- {
- Id id;
- if (removeParentPrefix)
- {
- int componentIndex = (parentId.Size == 1) ? 0 : 1;
- id = new Id(prefix, parentId.GetComponents(componentIndex), sequence);
- }
- else
- {
- id = new Id(prefix, parentId, sequence);
+ Tracer.Error($"Could not generate host name prefix from DNS lookup: {e}");
}
- return id;
+ hostName = SanitizeHostName(hostName);
}
- public override string generateID()
+ public IdGenerator(string prefix = DEFAULT_PREFIX)
{
- return GenerateId().ToString();
- }
- }
-
- class CustomIdGenerator : IdGenerator
- {
- protected object[] parts;
- protected bool isOnlyParts;
-
-
- public CustomIdGenerator(string prefix, params object[] args) : base(prefix)
- {
- parts = args;
+ this.prefix = prefix + (hostName != null ? hostName + ":" : "");
}
- public CustomIdGenerator(bool onlyParts, params object[] args) : this(DEFAULT_PREFIX, args)
- {
- isOnlyParts = onlyParts;
- }
+ public string GenerateId() => $"{prefix}{Guid.NewGuid().ToString()}:{Interlocked.Increment(ref sequence).ToString()}";
- public CustomIdGenerator(Id pId) : this(DEFAULT_PREFIX, pId) { }
-
- public override Id GenerateId()
+ public static string SanitizeHostName(string hostName)
{
- Id id;
- if (isOnlyParts)
+ var sanitizedHostname = string.Concat(GetASCIICharacters(hostName));
+ if (sanitizedHostname.Length != hostName.Length)
{
- id = new Id(parts);
+ Tracer.Info($"Sanitized hostname from: {hostName} to: {sanitizedHostname}");
}
- else
- {
- id = new Id(prefix, parts, sequence);
- }
- return id;
+
+ return sanitizedHostname;
}
- public override string generateID()
- {
- return GenerateId().ToString();
- }
+ private static IEnumerable<char> GetASCIICharacters(string hostName) => hostName.Where(ch => ch < 127);
}
-
-#endregion
-}
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Interop-Test/Transactions/NmsTransactedConsumerTest.cs b/test/Apache-NMS-AMQP-Interop-Test/Transactions/NmsTransactedConsumerTest.cs
index 907faf6..37dbd29 100644
--- a/test/Apache-NMS-AMQP-Interop-Test/Transactions/NmsTransactedConsumerTest.cs
+++ b/test/Apache-NMS-AMQP-Interop-Test/Transactions/NmsTransactedConsumerTest.cs
@@ -460,7 +460,7 @@
consumer.Close();
}
- [Test, Timeout(60_000), Repeat(5)]
+ [Test, Timeout(60_000)]
public void TestConsumerMessagesInOrder()
{
int messageCount = 4;
diff --git a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
index a86bb1e..49a241a 100644
--- a/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
+++ b/test/Apache-NMS-AMQP-Test/ConnectionFactoryTest.cs
@@ -103,6 +103,7 @@
"&nms.clientIDPrefix=clientId" +
"&nms.requestTimeout=1000" +
"&nms.sendTimeout=1000" +
+ "&nms.closeTimeout=2000" +
"&nms.localMessageExpiry=false";
NmsConnectionFactory factory = new NmsConnectionFactory(new Uri(configuredUri));
@@ -114,6 +115,7 @@
Assert.AreEqual("clientId", factory.ClientIdPrefix);
Assert.AreEqual(1000, factory.RequestTimeout);
Assert.AreEqual(1000, factory.SendTimeout);
+ Assert.AreEqual(2000, factory.CloseTimeout);
Assert.IsFalse(factory.LocalMessageExpiry);
}
diff --git a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
index ffe6a62..2bd93ff 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/FailoverIntegrationTest.cs
@@ -796,7 +796,7 @@
connection.AddConnectionListener(connectionListener.Object);
testPeer.ExpectBegin();
- testPeer.ExpectBegin(nextOutgoingId: 2);
+ testPeer.ExpectBegin();
ISession session1 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
ISession session2 = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
diff --git a/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
index 9242c58..203155d 100644
--- a/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Integration/TransactionsIntegrationTest.cs
@@ -1098,7 +1098,7 @@
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
- IConnection connection = EstablishConnection(testPeer);
+ IConnection connection = EstablishConnection(testPeer, "nms.closeTimeout=100");
connection.Start();
testPeer.ExpectBegin();
@@ -1125,7 +1125,7 @@
{
using (TestAmqpPeer testPeer = new TestAmqpPeer())
{
- IConnection connection = EstablishConnection(testPeer);
+ IConnection connection = EstablishConnection(testPeer, "nms.closeTimeout=100");
connection.Start();
testPeer.ExpectBegin();
diff --git a/test/Apache-NMS-AMQP-Test/Message/DefaultMessageIdBuilderTest.cs b/test/Apache-NMS-AMQP-Test/Message/DefaultMessageIdBuilderTest.cs
new file mode 100644
index 0000000..3b90f5b
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Message/DefaultMessageIdBuilderTest.cs
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using Apache.NMS.AMQP.Message;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Message
+{
+ [TestFixture]
+ public class DefaultMessageIdBuilderTest
+ {
+ [Test]
+ public void TestCreateMessageId()
+ {
+ // Arrange
+ var messageIdBuilder = new DefaultMessageIdBuilder();
+
+ // Act
+ object messageId = messageIdBuilder.CreateMessageId("ID:c29c38a8-cad1-4328-af6c-dcc940cdfc2d:1:1:1", 1);
+
+ // Assert
+ Assert.AreEqual("ID:c29c38a8-cad1-4328-af6c-dcc940cdfc2d:1:1:1-1", messageId.ToString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
index cc3a697..69ff292 100644
--- a/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
+++ b/test/Apache-NMS-AMQP-Test/Message/Facade/NmsTestMessageFacade.cs
@@ -20,7 +20,6 @@
using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Message.Facade;
-using Apache.NMS.AMQP.Provider.Amqp;
using Apache.NMS.Util;
namespace NMS.AMQP.Test.Message.Facade
@@ -28,12 +27,6 @@
[Serializable]
public class NmsTestMessageFacade : INmsMessageFacade
{
- private bool hasBody;
-
- public void Initialize(AmqpConsumer consumer)
- {
- }
-
public virtual NmsMessage AsMessage()
{
return new NmsMessage(this);
@@ -47,22 +40,17 @@
public int RedeliveryCount { get; set; }
- public bool HasBody()
- {
- return hasBody;
- }
-
public void OnSend(TimeSpan producerTtl)
{
}
- public void OnDispatch()
+ public string NMSMessageId
{
-
+ get => ProviderMessageIdObject as string;
+ set => ProviderMessageIdObject = value;
}
- public string NMSMessageId { get; set; }
public IPrimitiveMap Properties { get; } = new PrimitiveMap();
public string NMSCorrelationID { get; set; }
public IDestination NMSDestination { get; set; }
@@ -77,6 +65,7 @@
public DateTime? Expiration { get; set; }
public sbyte? JmsMsgType { get; }
public bool IsPersistent { get; set; }
+ public object ProviderMessageIdObject { get; set; }
public INmsMessageFacade Copy()
{
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionIdTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionIdTest.cs
new file mode 100644
index 0000000..6fd076c
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionIdTest.cs
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsConnectionIdTest : NmsResourceIdTestBase
+ {
+ [Test]
+ public void TestCreateFromStringThrowsWhenNull()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsConnectionId(null));
+ }
+
+ [Test]
+ public void TestCreateFromStringThrowsWhenEmpty()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsConnectionId(string.Empty));
+ }
+
+ [Test]
+ public void TestNmsConnectionIdFromNmsString()
+ {
+ var id = new IdGenerator().GenerateId();
+ var connectionId = new NmsConnectionId(id);
+
+ Assert.AreEqual(id, connectionId.ToString());
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ var id1 = new IdGenerator().GenerateId();
+ var id2 = new IdGenerator().GenerateId();
+ var connectionId1 = new NmsConnectionId(id1);
+ var connectionId2 = new NmsConnectionId(id2);
+ var connectionId3 = new NmsConnectionId(id1);
+
+ Assert.AreNotEqual(connectionId1, connectionId2);
+ Assert.AreEqual(connectionId1, connectionId3);
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var id1 = new IdGenerator().GenerateId();
+ var id2 = new IdGenerator().GenerateId();
+ var connectionId1 = new NmsConnectionId(id1);
+ var connectionId2 = new NmsConnectionId(id2);
+ var connectionId3 = new NmsConnectionId(id1);
+
+ Assert.AreNotEqual(connectionId1.GetHashCode(), connectionId2.GetHashCode());
+ Assert.AreEqual(connectionId1.GetHashCode(), connectionId3.GetHashCode());
+ }
+
+ [Test]
+ public void TestToString()
+ {
+ var id = new IdGenerator().GenerateId();
+ var connectionId = new NmsConnectionId(id);
+
+ Assert.AreEqual(id, connectionId.ToString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionInfoTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionInfoTest.cs
new file mode 100644
index 0000000..7618e47
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsConnectionInfoTest.cs
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsConnectionInfoTest
+ {
+ private NmsConnectionId firstId;
+ private NmsConnectionId secondId;
+
+ [SetUp]
+ public void SetUp()
+ {
+ var generator = new IdGenerator();
+ firstId = new NmsConnectionId(generator.GenerateId());
+ secondId = new NmsConnectionId(generator.GenerateId());
+ }
+
+ [Test]
+ public void TestExceptionWhenCreatedWithNullConnectionId()
+ {
+ Assert.Catch<ArgumentNullException>(() => new NmsConnectionInfo(null));
+ }
+
+ [Test]
+ public void TestCreate()
+ {
+ var info = new NmsConnectionInfo(firstId);
+ Assert.AreSame(firstId, info.Id);
+ Assert.NotNull(info.ToString());
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var first = new NmsConnectionInfo(firstId);
+ var second = new NmsConnectionInfo(secondId);
+
+ Assert.AreEqual(first.GetHashCode(), first.GetHashCode());
+ Assert.AreEqual(second.GetHashCode(), second.GetHashCode());
+ Assert.AreNotEqual(first.GetHashCode(), second.GetHashCode());
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ var first = new NmsConnectionInfo(firstId);
+ var second = new NmsConnectionInfo(secondId);
+
+ Assert.AreEqual(first, first);
+ Assert.AreEqual(second, second);
+
+ Assert.IsFalse(first.Equals(second));
+ Assert.IsFalse(second.Equals(first));
+ }
+
+ [Test]
+ public void TestIsExplicitClientId()
+ {
+ var info = new NmsConnectionInfo(firstId);
+ Assert.IsFalse(info.IsExplicitClientId);
+ info.SetClientId("something", true);
+ Assert.IsTrue(info.IsExplicitClientId);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerIdTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerIdTest.cs
new file mode 100644
index 0000000..d0a4bff
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerIdTest.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsConsumerIdTest : NmsResourceIdTestBase
+ {
+ [Test]
+ public void TestCreateFromSessionIdThrowsWhenNull()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsConsumerId(null, 0));
+ }
+
+ [Test]
+ public void TestNmsSessionIdFromNmsConnectionId()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = CreateSessionId(connectionId, 1);
+
+ var consumerId = new NmsConsumerId(sessionId, 1);
+
+ Assert.AreEqual(connectionId, consumerId.ConnectionId);
+ Assert.AreEqual(1, consumerId.Value);
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId1 = new NmsSessionId(connectionId, 1);
+ var sessionId2 = new NmsSessionId(connectionId, 2);
+
+ Assert.AreNotEqual(new NmsConsumerId(sessionId1, 1), new NmsConsumerId(sessionId1, 2));
+ Assert.AreNotEqual(new NmsConsumerId(sessionId1, 1), new NmsConsumerId(sessionId2, 1));
+ Assert.AreEqual(new NmsConsumerId(sessionId1, 1), new NmsConsumerId(sessionId1, 1));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId1 = new NmsSessionId(connectionId, 1);
+ var sessionId2 = new NmsSessionId(connectionId, 2);
+
+ Assert.AreNotEqual(new NmsConsumerId(sessionId1, 1).GetHashCode(), new NmsConsumerId(sessionId1, 2).GetHashCode());
+ Assert.AreNotEqual(new NmsConsumerId(sessionId1, 1).GetHashCode(), new NmsConsumerId(sessionId2, 1).GetHashCode());
+ Assert.AreEqual(new NmsConsumerId(sessionId1, 1).GetHashCode(), new NmsConsumerId(sessionId1, 1).GetHashCode());
+ }
+
+ [Test]
+ public void TestToString()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = new NmsSessionId(connectionId, 1);
+ var consumerId = new NmsConsumerId(sessionId, 1);
+
+ Assert.AreEqual($"{connectionId}:1:1", consumerId.ToString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerInfoTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerInfoTest.cs
new file mode 100644
index 0000000..a07d392
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsConsumerInfoTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsConsumerInfoTest
+ {
+ private NmsConsumerId firstId;
+ private NmsConsumerId secondId;
+
+ private NmsSessionId firstSessionId;
+ private NmsSessionId secondSessionId;
+
+ [SetUp]
+ public void SetUp()
+ {
+ var generator = new IdGenerator();
+
+ var nmsConnectionId = new NmsConnectionId(generator.GenerateId());
+
+ firstSessionId = new NmsSessionId(nmsConnectionId, 1);
+ secondSessionId = new NmsSessionId(nmsConnectionId, 2);
+
+ firstId = new NmsConsumerId(firstSessionId, 1);
+ secondId = new NmsConsumerId(secondSessionId, 2);
+ }
+
+ [Test]
+ public void TestExceptionWhenCreatedWithNullConsumerId()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsConsumerInfo(null));
+ }
+
+ [Test]
+ public void TestCreateFromConsumerId()
+ {
+ var consumerInfo = new NmsConsumerInfo(firstId);
+ Assert.AreSame(firstId, consumerInfo.Id);
+ Assert.AreSame(firstId.SessionId, consumerInfo.SessionId);
+ Assert.IsFalse(string.IsNullOrEmpty(consumerInfo.ToString()));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var first = new NmsConsumerInfo(firstId);
+ var second = new NmsConsumerInfo(secondId);
+
+ Assert.AreEqual(first.GetHashCode(), first.GetHashCode());
+ Assert.AreEqual(second.GetHashCode(), second.GetHashCode());
+
+ Assert.AreNotEqual(first.GetHashCode(), second.GetHashCode());
+ }
+
+ [Test]
+ public void TestEqualsCode()
+ {
+ var first = new NmsConsumerInfo(firstId);
+ var second = new NmsConsumerInfo(secondId);
+
+ Assert.AreEqual(first, first);
+ Assert.AreEqual(second, second);
+
+ Assert.AreNotEqual(first, second);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsProducerIdTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsProducerIdTest.cs
new file mode 100644
index 0000000..0e2c8ab
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsProducerIdTest.cs
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsProducerIdTest : NmsResourceIdTestBase
+ {
+ [Test]
+ public void TestCreateFromSessionIdThrowsWhenNull()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsProducerId(null, 0));
+ }
+
+ [Test]
+ public void TestNmsProducerIdFromNmsSessionId()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = CreateSessionId(connectionId);
+ var producerId = new NmsProducerId(sessionId, 1);
+
+ Assert.AreEqual(1, producerId.Value);
+ Assert.AreEqual(producerId.SessionId, sessionId);
+ Assert.AreEqual(producerId.ConnectionId, connectionId);
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId1 = new NmsSessionId(connectionId, 1);
+ var sessionId2 = new NmsSessionId(connectionId, 2);
+
+ Assert.AreNotEqual(new NmsProducerId(sessionId1, 1), new NmsProducerId(sessionId1, 2));
+ Assert.AreNotEqual(new NmsProducerId(sessionId1, 1), new NmsProducerId(sessionId2, 1));
+ Assert.AreEqual(new NmsProducerId(sessionId1, 1), new NmsProducerId(sessionId1, 1));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId1 = new NmsSessionId(connectionId, 1);
+ var sessionId2 = new NmsSessionId(connectionId, 2);
+
+ Assert.AreNotEqual(new NmsProducerId(sessionId1, 1).GetHashCode(), new NmsProducerId(sessionId1, 2).GetHashCode());
+ Assert.AreNotEqual(new NmsProducerId(sessionId1, 1).GetHashCode(), new NmsProducerId(sessionId2, 1).GetHashCode());
+ Assert.AreEqual(new NmsProducerId(sessionId1, 1).GetHashCode(), new NmsProducerId(sessionId1, 1).GetHashCode());
+ }
+
+ [Test]
+ public void TestToString()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = new NmsSessionId(connectionId, 1);
+ var producerId = new NmsProducerId(sessionId, 1);
+
+ Assert.AreEqual($"{connectionId}:1:1", producerId.ToString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsProducerInfoTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsProducerInfoTest.cs
new file mode 100644
index 0000000..84d8df5
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsProducerInfoTest.cs
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ public class NmsProducerInfoTest
+ {
+ private NmsProducerId firstId;
+ private NmsProducerId secondId;
+
+ private NmsSessionId firstSessionId;
+ private NmsSessionId secondSessionId;
+
+ [SetUp]
+ public void SetUp()
+ {
+ var generator = new IdGenerator();
+
+ var nmsConnectionId = new NmsConnectionId(generator.GenerateId());
+
+ firstSessionId = new NmsSessionId(nmsConnectionId, 1);
+ secondSessionId = new NmsSessionId(nmsConnectionId, 2);
+
+ firstId = new NmsProducerId(firstSessionId, 1);
+ secondId = new NmsProducerId(secondSessionId, 2);
+ }
+
+ [Test]
+ public void TestExceptionWhenCreatedWithNullProducerId()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsProducerInfo(null));
+ }
+
+ [Test]
+ public void TestCreateFromProducerId()
+ {
+ var producerInfo = new NmsProducerInfo(firstId);
+ Assert.AreSame(firstId, producerInfo.Id);
+ Assert.AreSame(firstId.SessionId, producerInfo.SessionId);
+
+ Assert.IsFalse(string.IsNullOrEmpty(producerInfo.ToString()));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var first = new NmsProducerInfo(firstId);
+ var second = new NmsProducerInfo(secondId);
+
+ Assert.AreEqual(first.GetHashCode(), first.GetHashCode());
+ Assert.AreEqual(second.GetHashCode(), second.GetHashCode());
+
+ Assert.AreNotEqual(first.GetHashCode(), second.GetHashCode());
+ }
+
+ [Test]
+ public void TestEqualsCode()
+ {
+ var first = new NmsProducerInfo(firstId);
+ var second = new NmsProducerInfo(secondId);
+
+ Assert.AreEqual(first, first);
+ Assert.AreEqual(second, second);
+
+ Assert.AreNotEqual(first, second);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/NMS.AMQP/Meta/ResourceInfo.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsResourceIdTestBase.cs
similarity index 64%
copy from src/NMS.AMQP/Meta/ResourceInfo.cs
copy to test/Apache-NMS-AMQP-Test/Meta/NmsResourceIdTestBase.cs
index e89b9f6..326914b 100644
--- a/src/NMS.AMQP/Meta/ResourceInfo.cs
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsResourceIdTestBase.cs
@@ -15,20 +15,22 @@
* limitations under the License.
*/
+using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Util;
-namespace Apache.NMS.AMQP.Meta
+namespace NMS.AMQP.Test.Meta
{
- public abstract class ResourceInfo
+ public abstract class NmsResourceIdTestBase
{
-
- private readonly Id resourceId;
-
- protected ResourceInfo(Id resourceId)
+ protected static NmsConnectionId CreateNmsConnectionId()
{
- this.resourceId = resourceId;
+ var generator = new IdGenerator();
+ return new NmsConnectionId(generator.GenerateId());
}
-
- public virtual Id Id { get { return resourceId; } }
+
+ protected static NmsSessionId CreateSessionId(NmsConnectionId connectionId, long value = 1)
+ {
+ return new NmsSessionId(connectionId, value);
+ }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsSessionIdTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsSessionIdTest.cs
new file mode 100644
index 0000000..1e4c7b8
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsSessionIdTest.cs
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsSessionIdTest : NmsResourceIdTestBase
+ {
+ [Test]
+ public void TestCreateFromConnectionIdThrowsWhenNull()
+ {
+ Assert.Catch<ArgumentException>(() => CreateSessionId(null));
+ }
+
+ [Test]
+ public void TestNmsSessionIdFromNmsConnectionId()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = CreateSessionId(connectionId, 1);
+
+ Assert.AreEqual(connectionId, sessionId.ConnectionId);
+ Assert.AreEqual(1, sessionId.Value);
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ var connectionId1 = CreateNmsConnectionId();
+ var connectionId2 = CreateNmsConnectionId();
+
+ Assert.AreNotEqual(CreateSessionId(connectionId1, 1), CreateSessionId(connectionId1, 2));
+ Assert.AreNotEqual(CreateSessionId(connectionId1, 1), CreateSessionId(connectionId2, 2));
+ Assert.AreEqual(CreateSessionId(connectionId1, 1), CreateSessionId(connectionId1, 1));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var connectionId1 = CreateNmsConnectionId();
+ var connectionId2 = CreateNmsConnectionId();
+
+ Assert.AreNotEqual(CreateSessionId(connectionId1, 1).GetHashCode(), CreateSessionId(connectionId1, 2).GetHashCode());
+ Assert.AreNotEqual(CreateSessionId(connectionId1, 1).GetHashCode(), CreateSessionId(connectionId2, 2).GetHashCode());
+ Assert.AreEqual(CreateSessionId(connectionId1, 1).GetHashCode(), CreateSessionId(connectionId1, 1).GetHashCode());
+ }
+
+ [Test]
+ public void TestToString()
+ {
+ var connectionId = CreateNmsConnectionId();
+ var sessionId = CreateSessionId(connectionId, 1);
+
+ Assert.AreEqual($"{connectionId}:1", sessionId.ToString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsSessionInfoTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsSessionInfoTest.cs
new file mode 100644
index 0000000..e1efc89
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsSessionInfoTest.cs
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ public class NmsSessionInfoTest
+ {
+ private NmsSessionId firstId;
+ private NmsSessionId secondId;
+
+ private NmsConnectionInfo connectionInfo;
+
+ [SetUp]
+ public void SetUp() {
+ var generator = new IdGenerator();
+
+ var connectionId = new NmsConnectionId(generator.GenerateId());
+
+ firstId = new NmsSessionId(connectionId, 1);
+ secondId = new NmsSessionId(connectionId, 2);
+
+ connectionInfo = new NmsConnectionInfo(connectionId);
+ }
+
+ [Test]
+ public void TestExceptionWhenCreatedWithNullConnectionId()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsSessionInfo(null, 1));
+ }
+
+ [Test]
+ public void TestExceptionWhenCreatedWithNullSessionId()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsSessionInfo(null));
+ }
+
+ [Test]
+ public void TestCreateFromSessionId()
+ {
+ var sessionInfo = new NmsSessionInfo(firstId);
+ Assert.AreSame(firstId, sessionInfo.Id);
+ Assert.IsFalse(string.IsNullOrEmpty(sessionInfo.ToString()));
+ }
+
+ [Test]
+ public void TestCreateFromConnectionInfo()
+ {
+ var sessionInfo = new NmsSessionInfo(connectionInfo, 1);
+ Assert.AreEqual(connectionInfo.Id, sessionInfo.Id.ConnectionId);
+ }
+
+ [Test]
+ public void TestIsTransacted()
+ {
+ var sessionInfo = new NmsSessionInfo(firstId);
+ sessionInfo.AcknowledgementMode = AcknowledgementMode.AutoAcknowledge;
+ Assert.IsFalse(sessionInfo.IsTransacted);
+ sessionInfo.AcknowledgementMode = AcknowledgementMode.Transactional;
+ Assert.IsTrue(sessionInfo.IsTransacted);
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var first = new NmsSessionInfo(firstId);
+ var second = new NmsSessionInfo(secondId);
+
+ Assert.AreEqual(first.GetHashCode(), first.GetHashCode());
+ Assert.AreEqual(second.GetHashCode(), second.GetHashCode());
+
+ Assert.AreNotEqual(first.GetHashCode(), second.GetHashCode());
+ }
+
+ [Test]
+ public void TestEqualsCode()
+ {
+ var first = new NmsSessionInfo(firstId);
+ var second = new NmsSessionInfo(secondId);
+
+ Assert.AreEqual(first, first);
+ Assert.AreEqual(second, second);
+
+ Assert.AreNotEqual(first, second);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionIdTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionIdTest.cs
new file mode 100644
index 0000000..adb4550
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionIdTest.cs
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsTransactionIdTest
+ {
+ private NmsConnectionId firstId;
+ private NmsConnectionId secondId;
+
+ [SetUp]
+ public void SetUp()
+ {
+ var generator = new IdGenerator();
+
+ firstId = new NmsConnectionId(generator.GenerateId());
+ secondId = new NmsConnectionId(generator.GenerateId());
+ }
+
+ [Test]
+ public void TestNullIdThrowsException()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsTransactionId(null, 0));
+ }
+
+ [Test]
+ public void TestConstructor()
+ {
+ var id = new NmsTransactionId(firstId, 1);
+ Assert.AreEqual(firstId, id.ConnectionId);
+ Assert.AreEqual(1, id.Value);
+ }
+
+ [Test]
+ public void TestToString()
+ {
+ var id = new NmsTransactionId(firstId, 1);
+ var txKey = id.ToString();
+ Assert.IsTrue(txKey.StartsWith("TX:"));
+ }
+
+ [Test]
+ public void TestEquals()
+ {
+ Assert.AreNotEqual(new NmsTransactionId(firstId, 1), new NmsTransactionId(firstId, 2));
+ Assert.AreNotEqual(new NmsTransactionId(firstId, 1), new NmsTransactionId(secondId, 1));
+ Assert.AreEqual(new NmsTransactionId(firstId, 1), new NmsTransactionId(firstId, 1));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ Assert.AreNotEqual(new NmsTransactionId(firstId, 1).GetHashCode(), new NmsTransactionId(firstId, 2).GetHashCode());
+ Assert.AreNotEqual(new NmsTransactionId(firstId, 1).GetHashCode(), new NmsTransactionId(secondId, 1).GetHashCode());
+ Assert.AreEqual(new NmsTransactionId(firstId, 1).GetHashCode(), new NmsTransactionId(firstId, 1).GetHashCode());
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionInfoTest.cs b/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionInfoTest.cs
new file mode 100644
index 0000000..f890810
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Meta/NmsTransactionInfoTest.cs
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+using Apache.NMS.AMQP.Meta;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Meta
+{
+ [TestFixture]
+ public class NmsTransactionInfoTest
+ {
+ private NmsConnectionId firstId;
+ private NmsConnectionId secondId;
+ private NmsSessionId firstSessionId;
+ private NmsSessionId secondSessionId;
+ private NmsTransactionId firstTxId;
+ private NmsTransactionId secondTxId;
+
+ [SetUp]
+ public void SetUp()
+ {
+ var generator = new IdGenerator();
+
+ firstId = new NmsConnectionId(generator.GenerateId());
+ secondId = new NmsConnectionId(generator.GenerateId());
+
+ firstSessionId = new NmsSessionId(firstId, 1);
+ secondSessionId = new NmsSessionId(secondId, 2);
+
+ firstTxId = new NmsTransactionId(firstId, 1);
+ secondTxId = new NmsTransactionId(secondId, 2);
+ }
+
+ [Test]
+ public void TestThrowsWhenSessionIdIsNull()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsTransactionInfo(null, firstTxId));
+ }
+
+ [Test]
+ public void TestThrowsWhenTransactionIdIsNull()
+ {
+ Assert.Catch<ArgumentException>(() => new NmsTransactionInfo(firstSessionId, null));
+ }
+
+ [Test]
+ public void TestCreateTransactionInfo()
+ {
+ var info = new NmsTransactionInfo(firstSessionId, firstTxId);
+ Assert.AreSame(firstSessionId, info.SessionId);
+ Assert.AreSame(firstTxId, info.Id);
+ Assert.IsFalse(string.IsNullOrEmpty(info.ToString()));
+ }
+
+ [Test]
+ public void TestHashCode()
+ {
+ var first = new NmsTransactionInfo(firstSessionId, firstTxId);
+ var second = new NmsTransactionInfo(secondSessionId, secondTxId);
+
+ Assert.AreEqual(first.GetHashCode(), first.GetHashCode());
+ Assert.AreEqual(second.GetHashCode(), second.GetHashCode());
+
+ Assert.AreNotEqual(first.GetHashCode(), second.GetHashCode());
+ }
+
+ [Test]
+ public void TestEqualsCode()
+ {
+ var first = new NmsTransactionInfo(firstSessionId, firstTxId);
+ var second = new NmsTransactionInfo(secondSessionId, secondTxId);
+
+ Assert.AreEqual(first, first);
+ Assert.AreEqual(second, second);
+
+ Assert.AreNotEqual(first, second);
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs b/test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs
index 6f20c14..e4523e0 100644
--- a/test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs
+++ b/test/Apache-NMS-AMQP-Test/NmsConnectionTest.cs
@@ -29,14 +29,14 @@
public class NmsConnectionTest
{
private NmsConnection connection;
- private ConnectionInfo connectionInfo;
+ private NmsConnectionInfo connectionInfo;
private MockProvider provider;
[SetUp]
public void SetUp()
{
provider = (MockProvider)new MockProviderFactory().CreateProvider(new Uri("mock://localhost"));
- connectionInfo = new ConnectionInfo(new Id("ID:TEST:1"));
+ connectionInfo = new NmsConnectionInfo(new NmsConnectionId("ID:TEST:1"));
}
[TearDown]
diff --git a/test/Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs b/test/Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs
index 60b2f45..f0f139b 100644
--- a/test/Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs
+++ b/test/Apache-NMS-AMQP-Test/NmsMessageProducerTest.cs
@@ -29,7 +29,7 @@
public class NmsMessageProducerTest
{
private readonly MockRemotePeer remotePeer = new MockRemotePeer();
- private ConnectionInfo connectionInfo;
+ private NmsConnectionInfo connectionInfo;
private NmsConnection connection;
private ISession session;
@@ -37,7 +37,7 @@
public void SetUp()
{
remotePeer.Start();
- connectionInfo = new ConnectionInfo(new Id("ID:TEST:1"));
+ connectionInfo = new NmsConnectionInfo(new NmsConnectionId("ID:TEST:1"));
connection = CreateConnectionToMockProvider();
session = connection.CreateSession(AcknowledgementMode.AutoAcknowledge);
}
diff --git a/test/Apache-NMS-AMQP-Test/NmsTemporaryQueueTest.cs b/test/Apache-NMS-AMQP-Test/NmsTemporaryQueueTest.cs
index c146133..171dd85 100644
--- a/test/Apache-NMS-AMQP-Test/NmsTemporaryQueueTest.cs
+++ b/test/Apache-NMS-AMQP-Test/NmsTemporaryQueueTest.cs
@@ -16,6 +16,7 @@
*/
using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Util;
using NUnit.Framework;
@@ -27,8 +28,8 @@
[Test]
public void TestTwoTemporaryQueuesWithTheSameAddressesAreEqual()
{
- NmsTemporaryQueue temporaryQueue1 = new NmsTemporaryQueue(new Id("test")) { Address = "myTemporaryQueue" };
- NmsTemporaryQueue temporaryQueue2 = new NmsTemporaryQueue(new Id("test")) { Address = "myTemporaryQueue" };
+ NmsTemporaryQueue temporaryQueue1 = new NmsTemporaryQueue("myTemporaryQueue");
+ NmsTemporaryQueue temporaryQueue2 = new NmsTemporaryQueue("myTemporaryQueue");
Assert.AreEqual(temporaryQueue1, temporaryQueue2);
Assert.AreNotSame(temporaryQueue1, temporaryQueue2);
@@ -38,8 +39,9 @@
[Test]
public void TestTwoTemporaryQueuesWithDifferentAddressesAreNotEqual()
{
- NmsTemporaryQueue temporaryQueue1 = new NmsTemporaryQueue(new Id("test")) { Address = "myTemporaryQueue" };
- NmsTemporaryQueue temporaryQueue2 = new NmsTemporaryQueue(new Id("test")) { Address = "myTemporaryQueue2" };
+ var connectionId = new NmsConnectionId("1");
+ NmsTemporaryQueue temporaryQueue1 = new NmsTemporaryQueue("myTemporaryQueue");
+ NmsTemporaryQueue temporaryQueue2 = new NmsTemporaryQueue("myTemporaryQueue2");
Assert.AreNotEqual(temporaryQueue1, temporaryQueue2);
Assert.AreNotEqual(temporaryQueue1.GetHashCode(), temporaryQueue2.GetHashCode());
diff --git a/test/Apache-NMS-AMQP-Test/NmsTemporaryTopicTest.cs b/test/Apache-NMS-AMQP-Test/NmsTemporaryTopicTest.cs
index ad686ad..37997e9 100644
--- a/test/Apache-NMS-AMQP-Test/NmsTemporaryTopicTest.cs
+++ b/test/Apache-NMS-AMQP-Test/NmsTemporaryTopicTest.cs
@@ -16,6 +16,7 @@
*/
using Apache.NMS.AMQP;
+using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Util;
using NUnit.Framework;
@@ -27,19 +28,19 @@
[Test]
public void TestTwoTemporaryTopicsWithTheSameAddressesAreEqual()
{
- NmsTemporaryTopic nmsTopic1 = new NmsTemporaryTopic(new Id("test")) { Address = "myTopic" };
- NmsTemporaryTopic nmsTopic2 = new NmsTemporaryTopic(new Id("test")) { Address = "myTopic" };
+ var nmsTopic1 = new NmsTemporaryTopic("myTopic");
+ var nmsTopic2 = new NmsTemporaryTopic("myTopic");
Assert.AreEqual(nmsTopic1, nmsTopic2);
Assert.AreNotSame(nmsTopic1, nmsTopic2);
Assert.AreEqual(nmsTopic1.GetHashCode(), nmsTopic2.GetHashCode());
}
-
+
[Test]
public void TestTwoTemporaryTopicsWithDifferentAddressesAreNotEqual()
{
- NmsTemporaryTopic nmsTopic1 = new NmsTemporaryTopic(new Id("test")) { Address = "myTopic" };
- NmsTemporaryTopic nmsTopic2 = new NmsTemporaryTopic(new Id("test")) { Address = "myTopic1" };
+ var nmsTopic1 = new NmsTemporaryTopic("myTopic");
+ var nmsTopic2 = new NmsTemporaryTopic("myTopic2");
Assert.AreNotEqual(nmsTopic1, nmsTopic2);
Assert.AreNotEqual(nmsTopic1.GetHashCode(), nmsTopic2.GetHashCode());
diff --git a/test/Apache-NMS-AMQP-Test/PriorityMessageQueueTest.cs b/test/Apache-NMS-AMQP-Test/PriorityMessageQueueTest.cs
index 0c81006..eaabe0c 100644
--- a/test/Apache-NMS-AMQP-Test/PriorityMessageQueueTest.cs
+++ b/test/Apache-NMS-AMQP-Test/PriorityMessageQueueTest.cs
@@ -231,7 +231,7 @@
{
NmsTestMessageFacade facade = new NmsTestMessageFacade();
facade.NMSPriority = priority;
- facade.NMSMessageId = messageId.generateID();
+ facade.NMSMessageId = messageId.GenerateId();
return new NmsMessage(facade);
}
}
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpCodecTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpCodecTest.cs
index 5677366..28cfb21 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpCodecTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpCodecTest.cs
@@ -573,9 +573,9 @@
case MessageSupport.JMS_DEST_TYPE_TOPIC:
return new NmsTopic("test");
case MessageSupport.JMS_DEST_TYPE_TEMP_QUEUE:
- return new NmsTemporaryQueue(new Id("test"));
+ return new NmsTemporaryQueue("test");
case MessageSupport.JMS_DEST_TYPE_TEMP_TOPIC:
- return new NmsTemporaryTopic(new Id("test"));
+ return new NmsTemporaryTopic("test");
default:
return null;
}
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsMessageFacadeTest.cs b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsMessageFacadeTest.cs
index 658d352..dea0098 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsMessageFacadeTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Amqp/AmqpNmsMessageFacadeTest.cs
@@ -25,7 +25,6 @@
using Apache.NMS.AMQP.Provider.Amqp;
using Apache.NMS.AMQP.Provider.Amqp.Message;
using Apache.NMS.AMQP.Util;
-using Moq;
using NUnit.Framework;
namespace NMS.AMQP.Test.Provider.Amqp
@@ -953,6 +952,47 @@
Assert.AreEqual(expected, result, "Incorrect messageId value received");
}
+ [Test]
+ public void TestGetProviderMessageIdObjectOnNewMessage()
+ {
+ var amqpMessageFacade = CreateNewMessageFacade();
+ Assert.IsNull(amqpMessageFacade.ProviderMessageIdObject);
+ }
+
+ [Test]
+ public void TestSetGetProviderMessageIdObjectOnNewMessageWithString()
+ {
+ var testMessageId = "ID:myStringMessageId";
+ var amqpMessageFacade = CreateNewMessageFacade();
+ amqpMessageFacade.ProviderMessageIdObject = testMessageId;
+
+ Assert.AreEqual(testMessageId, amqpMessageFacade.ProviderMessageIdObject);
+ Assert.AreEqual(testMessageId, amqpMessageFacade.NMSMessageId);
+ }
+
+ [Test]
+ public void TestSetProviderMessageIdObjectNullClearsProperty()
+ {
+ var testMessageId = "ID:myStringMessageId";
+ var amqpMessageFacade = CreateNewMessageFacade();
+ amqpMessageFacade.ProviderMessageIdObject = testMessageId;
+ Assert.AreEqual(testMessageId, amqpMessageFacade.ProviderMessageIdObject);
+
+ amqpMessageFacade.ProviderMessageIdObject = null;
+ Assert.IsNull(amqpMessageFacade.ProviderMessageIdObject);
+ }
+
+ [Test]
+ public void TestSetProviderMessageIdObjectNullDoesNotCreateProperties()
+ {
+ var amqpMessageFacade = CreateNewMessageFacade();
+ Assert.IsNull(amqpMessageFacade.Message.Properties);
+
+ amqpMessageFacade.ProviderMessageIdObject = null;
+ Assert.IsNull(amqpMessageFacade.Message.Properties);
+ Assert.IsNull(amqpMessageFacade.ProviderMessageIdObject);
+ }
+
// --- creation-time field ---
[Test]
@@ -1320,7 +1360,7 @@
source.SetMessageAnnotation("test-annotation", "value");
var queue = new NmsQueue("Test-Queue");
- var temporaryQueue = new NmsTemporaryQueue(new Id("Test-Temp-Queue")) { Address = "Test-Temp-Queue" };
+ var temporaryQueue = new NmsTemporaryQueue("Test-Temp-Queue");
source.NMSDestination = queue;
source.NMSReplyTo = temporaryQueue;
source.ContentType = "Test-ContentType";
diff --git a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
index 7b4c8be..1d7410e 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/FailoverProviderTest.cs
@@ -36,9 +36,8 @@
[TestFixture]
public class FailoverProviderTest
{
- private readonly IdGenerator connectionIdGenerator = new IdGenerator();
private List<Uri> uris;
- private ConnectionInfo connectionInfo;
+ private NmsConnectionInfo connectionInfo;
private FailoverProvider provider;
private MockRemotePeer mockPeer;
@@ -233,8 +232,8 @@
connection.CreateSession(AcknowledgementMode.AutoAcknowledge).Close();
connection.Close();
- Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<SessionInfo>());
- Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<SessionInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<NmsSessionInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<NmsSessionInfo>());
}
[Test, Timeout(3000)]
@@ -250,9 +249,9 @@
session.CreateConsumer(topic).Close();
connection.Close();
- Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<ConsumerInfo>());
- Assert.AreEqual(1, mockPeer.ContextStats.GetStartResourceCalls<ConsumerInfo>());
- Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<ConsumerInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<NmsConsumerInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetStartResourceCalls<NmsConsumerInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<NmsConsumerInfo>());
}
[Test, Timeout(300000)]
@@ -268,8 +267,8 @@
session.CreateProducer(topic).Close();
connection.Close();
- Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<ProducerInfo>());
- Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<ProducerInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetCreateResourceCalls<NmsProducerInfo>());
+ Assert.AreEqual(1, mockPeer.ContextStats.GetDestroyResourceCalls<NmsProducerInfo>());
}
[Test, Timeout(300000)]
@@ -287,9 +286,9 @@
Assert.AreEqual(1, mockPeer.ContextStats.RecoverCalls);
}
- private ConnectionInfo CreateConnectionInfo()
+ private NmsConnectionInfo CreateConnectionInfo()
{
- return new ConnectionInfo(connectionIdGenerator.GenerateId());
+ return new NmsConnectionInfo(new NmsConnectionId("test"));
}
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProvider.cs b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProvider.cs
index 57d4f6c..9baccff 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProvider.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProvider.cs
@@ -51,7 +51,7 @@
throw new Exception();
}
- public async Task Connect(ConnectionInfo connectionInfo)
+ public async Task Connect(NmsConnectionInfo connectionInfo)
{
Stats.RecordConnectAttempt();
@@ -77,35 +77,35 @@
listener = providerListener;
}
- public Task CreateResource(ResourceInfo resourceInfo)
+ public Task CreateResource(INmsResource resourceInfo)
{
Stats.RecordCreateResourceCall(resourceInfo.GetType());
return Task.CompletedTask;
}
- public Task DestroyResource(ResourceInfo resourceInfo)
+ public Task DestroyResource(INmsResource resourceInfo)
{
Stats.RecordDestroyResourceCall(resourceInfo.GetType());
return Task.CompletedTask;
}
- public Task StartResource(ResourceInfo resourceInfo)
+ public Task StartResource(INmsResource resourceInfo)
{
return Task.CompletedTask;
}
- public Task StopResource(ResourceInfo resourceInfo)
+ public Task StopResource(INmsResource resourceInfo)
{
throw new NotImplementedException();
}
- public Task Recover(Id sessionId)
+ public Task Recover(NmsSessionId sessionId)
{
Stats.RecordRecoverCalls();
return Task.CompletedTask;
}
- public Task Acknowledge(Id sessionId, AckType ackType)
+ public Task Acknowledge(NmsSessionId sessionId, AckType ackType)
{
throw new NotImplementedException();
}
@@ -126,17 +126,17 @@
throw new NotImplementedException();
}
- public Task Rollback(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Rollback(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
throw new NotImplementedException();
}
- public Task Commit(TransactionInfo transactionInfo, TransactionInfo nextTransactionInfo)
+ public Task Commit(NmsTransactionInfo transactionInfo, NmsTransactionInfo nextTransactionInfo)
{
throw new NotImplementedException();
}
- public InboundMessageDispatch ReceiveMessage(ConsumerInfo consumerInfo, TimeSpan timeout)
+ public InboundMessageDispatch ReceiveMessage(NmsConsumerInfo consumerInfo, TimeSpan timeout)
{
throw new NotImplementedException();
}
diff --git a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
index 5533781..7090468 100644
--- a/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
+++ b/test/Apache-NMS-AMQP-Test/Provider/Mock/MockProviderStats.cs
@@ -39,11 +39,11 @@
public int CloseAttempts { get; private set; }
public int RecoverCalls { get; set; }
- public int GetCreateResourceCalls<T>() where T:ResourceInfo => createResourceCalls[typeof(T)];
+ public int GetCreateResourceCalls<T>() where T : INmsResource => createResourceCalls[typeof(T)];
- public int GetDestroyResourceCalls<T>() where T : ResourceInfo => destroyResourceCalls[typeof(T)];
+ public int GetDestroyResourceCalls<T>() where T : INmsResource => destroyResourceCalls[typeof(T)];
- public int GetStartResourceCalls<T>() where T : ResourceInfo => destroyResourceCalls[typeof(T)];
+ public int GetStartResourceCalls<T>() where T : INmsResource => destroyResourceCalls[typeof(T)];
public void RecordProviderCreated()
{
diff --git a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
index 6c2f147..0e8f74f 100644
--- a/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
+++ b/test/Apache-NMS-AMQP-Test/TestAmqp/TestAmqpPeer.cs
@@ -28,6 +28,7 @@
using Amqp.Sasl;
using Amqp.Transactions;
using Amqp.Types;
+using Apache.NMS.AMQP;
using Apache.NMS.AMQP.Util;
using NLog;
using NMS.AMQP.Test.TestAmqp.BasicTypes;
diff --git a/test/Apache-NMS-AMQP-Test/AtomicBoolTest.cs b/test/Apache-NMS-AMQP-Test/Utils/AtomicBoolTest.cs
similarity index 100%
rename from test/Apache-NMS-AMQP-Test/AtomicBoolTest.cs
rename to test/Apache-NMS-AMQP-Test/Utils/AtomicBoolTest.cs
diff --git a/src/NMS.AMQP/Meta/ResourceInfo.cs b/test/Apache-NMS-AMQP-Test/Utils/AtomicLongTest.cs
similarity index 69%
copy from src/NMS.AMQP/Meta/ResourceInfo.cs
copy to test/Apache-NMS-AMQP-Test/Utils/AtomicLongTest.cs
index e89b9f6..40cb70f 100644
--- a/src/NMS.AMQP/Meta/ResourceInfo.cs
+++ b/test/Apache-NMS-AMQP-Test/Utils/AtomicLongTest.cs
@@ -1,4 +1,4 @@
-/*
+/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -15,20 +15,23 @@
* limitations under the License.
*/
+using System.Threading.Tasks;
using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
-namespace Apache.NMS.AMQP.Meta
+namespace NMS.AMQP.Test
{
- public abstract class ResourceInfo
+ [TestFixture]
+ public class AtomicLongTest
{
-
- private readonly Id resourceId;
-
- protected ResourceInfo(Id resourceId)
+ [Test]
+ public void TestIncrementAndGetIsThreadSafe()
{
- this.resourceId = resourceId;
- }
+ var atomicLong = new AtomicLong(1);
- public virtual Id Id { get { return resourceId; } }
+ Parallel.For(1, 10_000, l => atomicLong.IncrementAndGet());
+
+ Assert.AreEqual(10_000, (long) atomicLong);
+ }
}
}
\ No newline at end of file
diff --git a/test/Apache-NMS-AMQP-Test/Utils/IdGeneratorTest.cs b/test/Apache-NMS-AMQP-Test/Utils/IdGeneratorTest.cs
new file mode 100644
index 0000000..8f5364f
--- /dev/null
+++ b/test/Apache-NMS-AMQP-Test/Utils/IdGeneratorTest.cs
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System.Linq;
+using Apache.NMS.AMQP.Util;
+using NUnit.Framework;
+
+namespace NMS.AMQP.Test.Utils
+{
+ [TestFixture]
+ public class IdGeneratorTest
+ {
+ [Test]
+ public void TestSanitizeHostName()
+ {
+ Assert.AreEqual("somehost.lan", IdGenerator.SanitizeHostName("somehost.lan"));
+
+ // include a UTF-8 char in the text \u0E08 is a Thai elephant
+ Assert.AreEqual("otherhost.lan", IdGenerator.SanitizeHostName("other\u0E08host.lan"));
+ }
+
+ [Test]
+ public void TestDefaultPrefix()
+ {
+ var generator = new IdGenerator();
+ string generated = generator.GenerateId();
+ Assert.True(generated.StartsWith(IdGenerator.DEFAULT_PREFIX));
+ Assert.IsFalse(generated.Substring(IdGenerator.DEFAULT_PREFIX.Length).StartsWith(":"));
+ }
+
+ [Test]
+ public void TestNonDefaultPrefix()
+ {
+ var idGenerator = new IdGenerator("TEST-");
+ var generated = idGenerator.GenerateId();
+ Assert.IsFalse(generated.StartsWith(IdGenerator.DEFAULT_PREFIX));
+ Assert.IsFalse(generated.Substring("TEST-".Length).StartsWith(":"));
+ }
+
+ [Test]
+ public void TestIdIndexIncrements()
+ {
+ var generator = new IdGenerator();
+ var sequences = Enumerable.Repeat(0, 5)
+ .Select(x => generator.GenerateId())
+ .Select(x => x.Split(':').Last())
+ .Select(int.Parse);
+
+ CollectionAssert.AreEqual(new[] { 1, 2, 3, 4, 5 }, sequences);
+ }
+ }
+}
\ No newline at end of file