https://issues.apache.org/jira/browse/AMQNET-454
Apply patch https://issues.apache.org/jira/secure/attachment/12619423/Apache.NMS.AMQP-qpid-object-lifecycle-02.patch
diff --git a/src/main/csharp/Connection.cs b/src/main/csharp/Connection.cs
index 68c3a50..d27ecd2 100644
--- a/src/main/csharp/Connection.cs
+++ b/src/main/csharp/Connection.cs
@@ -50,7 +50,7 @@
private int sessionCounter = 0;
private readonly IList sessions = ArrayList.Synchronized(new ArrayList());
- Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
+ private Org.Apache.Qpid.Messaging.Connection qpidConnection = null; // Don't create until Start()
/// <summary>
/// Creates new connection
@@ -81,7 +81,7 @@
{
foreach (Session session in sessions)
{
- //session.Start();
+ session.Start();
}
}
}
@@ -336,7 +336,44 @@
public void Close()
{
- Dispose();
+ if (!this.closed.Value)
+ {
+ this.Stop();
+ }
+
+ lock (connectedLock)
+ {
+ if (this.closed.Value)
+ {
+ return;
+ }
+
+ try
+ {
+ Tracer.InfoFormat("Connection[]: Closing Connection Now.");
+ this.closing.Value = true;
+
+ lock (sessions.SyncRoot)
+ {
+ foreach (Session session in sessions)
+ {
+ session.Shutdown();
+ }
+ }
+ sessions.Clear();
+
+ }
+ catch (Exception ex)
+ {
+ Tracer.ErrorFormat("Connection[]: Error during connection close: {0}", ex);
+ }
+ finally
+ {
+ this.closed.Value = true;
+ this.connected.Value = false;
+ this.closing.Value = false;
+ }
+ }
}
public void PurgeTempDestinations()
@@ -359,5 +396,15 @@
{
return Interlocked.Increment(ref sessionCounter);
}
+
+ public Org.Apache.Qpid.Messaging.Session CreateQpidSession()
+ {
+ // TODO: Session name; transactional session
+ if (!connected.Value)
+ {
+ throw new ConnectionClosedException();
+ }
+ return qpidConnection.CreateSession();
+ }
}
}
diff --git a/src/main/csharp/MessageConsumer.cs b/src/main/csharp/MessageConsumer.cs
index cd09930..b070a41 100644
--- a/src/main/csharp/MessageConsumer.cs
+++ b/src/main/csharp/MessageConsumer.cs
@@ -1,6 +1,3 @@
-using System;
-using Org.Apache.Qpid.Messaging;
-using System.Threading;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
@@ -17,7 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
+using System;
+using System.Threading;
using Apache.NMS.Util;
+using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
@@ -26,6 +27,11 @@
/// </summary>
public class MessageConsumer : IMessageConsumer
{
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
+
protected TimeSpan zeroTimeout = new TimeSpan(0);
private readonly Session session;
@@ -38,6 +44,8 @@
private AutoResetEvent pause = new AutoResetEvent(false);
private Atomic<bool> asyncDelivery = new Atomic<bool>(false);
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private Org.Apache.Qpid.Messaging.Receiver qpidReceiver = null;
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
@@ -54,6 +62,28 @@
this.acknowledgementMode = acknowledgementMode;
}
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!session.IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid sender
+ qpidReceiver = session.CreateQpidReceiver("");
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to create Qpid Receiver : " + e.Message);
+ }
+ }
+ }
+
public event MessageListener Listener
{
add
diff --git a/src/main/csharp/MessageProducer.cs b/src/main/csharp/MessageProducer.cs
index 4e2af67..6f14616 100644
--- a/src/main/csharp/MessageProducer.cs
+++ b/src/main/csharp/MessageProducer.cs
@@ -14,7 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
using System;
+using System.Threading;
+using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
@@ -24,8 +27,13 @@
/// </summary>
public class MessageProducer : IMessageProducer
{
+ /// <summary>
+ /// Private object used for synchronization, instead of public "this"
+ /// </summary>
+ private readonly object myLock = new object();
private readonly Session session;
+ private readonly int id;
private Destination destination;
//private long messageCounter;
@@ -34,10 +42,12 @@
private MsgPriority priority;
private bool disableMessageID;
private bool disableMessageTimestamp;
- private readonly int id;
//private IMessageConverter messageConverter;
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
+ private Org.Apache.Qpid.Messaging.Sender qpidSender = null;
+
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
{
@@ -52,6 +62,28 @@
this.destination = destination;
}
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!session.IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid sender
+ qpidSender = session.CreateQpidSender("");
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new NMSException("Failed to create Qpid Sender : " + e.Message);
+ }
+ }
+ }
+
public void Send(IMessage message)
{
Send(Destination, message);
diff --git a/src/main/csharp/Session.cs b/src/main/csharp/Session.cs
index 06d464d..19b6832 100644
--- a/src/main/csharp/Session.cs
+++ b/src/main/csharp/Session.cs
@@ -17,6 +17,7 @@
using System;
using System.Collections;
using System.Threading;
+using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
@@ -43,6 +44,8 @@
private int producerCounter;
private long nextDeliveryId;
private long lastDeliveredSequenceId;
+ private readonly object sessionLock = new object();
+ private readonly Atomic<bool> started = new Atomic<bool>(false);
protected bool disposed = false;
protected bool closed = false;
protected bool closing = false;
@@ -50,6 +53,8 @@
private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TimeSpan requestTimeout;
+ private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start()
+
public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
@@ -61,6 +66,58 @@
// TODO: transactions
throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
}
+ if (connection.IsStarted)
+ {
+ this.Start();
+ }
+ connection.AddSession(this);
+ }
+
+ /// <summary>
+ /// Create new unmanaged session and start senders and receivers
+ /// Associated connection must be open.
+ /// </summary>
+ public void Start()
+ {
+ // Don't try creating session if connection not yet up
+ if (!connection.IsStarted)
+ {
+ throw new ConnectionClosedException();
+ }
+
+ if (started.CompareAndSet(false, true))
+ {
+ try
+ {
+ // Create qpid session
+ qpidSession = connection.CreateQpidSession();
+
+ // Start producers and consumers
+ lock (producers.SyncRoot)
+ {
+ foreach (MessageProducer producer in producers.Values)
+ {
+ producer.Start();
+ }
+ }
+ lock (consumers.SyncRoot)
+ {
+ foreach (MessageConsumer consumer in consumers.Values)
+ {
+ consumer.Start();
+ }
+ }
+ }
+ catch (Org.Apache.Qpid.Messaging.QpidException e)
+ {
+ throw new SessionClosedException( "Failed to create session : " + e.Message );
+ }
+ }
+ }
+
+ public bool IsStarted
+ {
+ get { return started.Value; }
}
public void Dispose()
@@ -136,7 +193,7 @@
{
foreach (MessageConsumer consumer in consumers.Values)
{
- consumer.Shutdown();
+ consumer.Close();
}
}
consumers.Clear();
@@ -145,7 +202,7 @@
{
foreach (MessageProducer producer in producers.Values)
{
- producer.Shutdown();
+ producer.Close();
}
}
producers.Clear();
@@ -463,7 +520,26 @@
{
get { return id; }
}
-
+
+
+ public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(string address)
+ {
+ if (!IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+ return qpidSession.CreateReceiver(address);
+ }
+
+ public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(string address)
+ {
+ if (!IsStarted)
+ {
+ throw new SessionClosedException();
+ }
+ return qpidSession.CreateSender(address);
+ }
+
#region Transaction State Events
public event SessionTxEventDelegate TransactionStartedListener;
diff --git a/src/main/csharp/SessionClosedException.cs b/src/main/csharp/SessionClosedException.cs
new file mode 100644
index 0000000..c6f5394
--- /dev/null
+++ b/src/main/csharp/SessionClosedException.cs
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+using System;
+
+namespace Apache.NMS.Amqp
+{
+ /// <summary>
+ /// Exception thrown when a session is used that it already closed
+ /// </summary>
+ [Serializable]
+ public class SessionClosedException : NMSException
+ {
+ public SessionClosedException()
+ : base("The session is already closed!")
+ {
+ }
+
+ public SessionClosedException(string message)
+ : base(message)
+ {
+ }
+
+ public SessionClosedException(string message, string errorCode)
+ : base(message, errorCode)
+ {
+ }
+
+ public SessionClosedException(string message, Exception innerException)
+ : base(message, innerException)
+ {
+ }
+
+ public SessionClosedException(string message, string errorCode, Exception innerException)
+ : base(message, errorCode, innerException)
+ {
+ }
+
+ #region ISerializable interface implementation
+
+ /// <summary>
+ /// Initializes a new instance of the SessionClosedException class with serialized data.
+ /// Throws System.ArgumentNullException if the info parameter is null.
+ /// Throws System.Runtime.Serialization.SerializationException if the class name is null or System.Exception.HResult is zero (0).
+ /// </summary>
+ /// <param name="info">The SerializationInfo that holds the serialized object data about the exception being thrown.</param>
+ /// <param name="context">The StreamingContext that contains contextual information about the source or destination.</param>
+ protected SessionClosedException(System.Runtime.Serialization.SerializationInfo info, System.Runtime.Serialization.StreamingContext context)
+ : base(info, context)
+ {
+ }
+
+ #endregion
+ }
+}
\ No newline at end of file