blob: 0a071e12ec7fad91bd22d44d5cccb163c29915b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using System.Threading;
using Apache.NMS.Util;
using Org.Apache.Qpid.Messaging;
namespace Apache.NMS.Amqp
{
/// <summary>
/// Amqp provider of ISession
/// </summary>
public class Session : ISession, IStartable, IStoppable
{
/// <summary>
/// Private object used for synchronization, instead of public "this"
/// </summary>
private readonly object myLock = new object();
private readonly IDictionary consumers = Hashtable.Synchronized(new Hashtable());
private readonly IDictionary producers = Hashtable.Synchronized(new Hashtable());
private Connection connection;
private AcknowledgementMode acknowledgementMode;
private IMessageConverter messageConverter;
private readonly int id;
private int consumerCounter;
private int producerCounter;
private long nextDeliveryId;
private long lastDeliveredSequenceId;
private readonly object sessionLock = new object();
private readonly Atomic<bool> started = new Atomic<bool>(false);
protected bool disposed = false;
protected bool closed = false;
protected bool closing = false;
private TimeSpan disposeStopTimeout = TimeSpan.FromMilliseconds(30000);
private TimeSpan closeStopTimeout = TimeSpan.FromMilliseconds(Timeout.Infinite);
private TimeSpan requestTimeout;
private Org.Apache.Qpid.Messaging.Session qpidSession = null; // Don't create until Start()
public Session(Connection connection, int sessionId, AcknowledgementMode acknowledgementMode)
{
this.connection = connection;
this.acknowledgementMode = acknowledgementMode;
MessageConverter = connection.MessageConverter;
id = sessionId;
if (this.acknowledgementMode == AcknowledgementMode.Transactional)
{
// TODO: transactions
throw new NotSupportedException("Transactions are not supported by Qpid/Amqp");
}
if (connection.IsStarted)
{
this.Start();
}
connection.AddSession(this);
}
#region IStartable Methods
/// <summary>
/// Create new unmanaged session and start senders and receivers
/// Associated connection must be open.
/// </summary>
public void Start()
{
// Don't try creating session if connection not yet up
if (!connection.IsStarted)
{
throw new ConnectionClosedException();
}
if (started.CompareAndSet(false, true))
{
try
{
// Create qpid session
if (qpidSession == null)
{
qpidSession = connection.CreateQpidSession();
}
// Start producers and consumers
lock (producers.SyncRoot)
{
foreach (MessageProducer producer in producers.Values)
{
producer.Start();
}
}
lock (consumers.SyncRoot)
{
foreach (MessageConsumer consumer in consumers.Values)
{
consumer.Start();
}
}
}
catch (Org.Apache.Qpid.Messaging.QpidException e)
{
throw new SessionClosedException( "Failed to create session : " + e.Message );
}
}
}
public bool IsStarted
{
get { return started.Value; }
}
#endregion
#region IStoppable Methods
public void Stop()
{
if (started.CompareAndSet(true, false))
{
try
{
lock (producers.SyncRoot)
{
foreach (MessageProducer producer in producers.Values)
{
producer.Stop();
}
}
lock (consumers.SyncRoot)
{
foreach (MessageConsumer consumer in consumers.Values)
{
consumer.Stop();
}
}
qpidSession.Dispose();
qpidSession = null;
}
catch (Org.Apache.Qpid.Messaging.QpidException e)
{
throw new NMSException("Failed to close session with Id " + SessionId.ToString() + " : " + e.Message);
}
}
}
#endregion
#region IDisposable Methods
public void Dispose()
{
Dispose(true);
}
#endregion
protected void Dispose(bool disposing)
{
if (this.disposed)
{
return;
}
try
{
// Force a Stop when we are Disposing vs a Normal Close.
Close();
}
catch
{
// Ignore network errors.
}
this.disposed = true;
}
public virtual void Close()
{
if (!this.closed)
{
try
{
Tracer.InfoFormat("Closing The Session with Id {0}", SessionId);
DoClose();
Tracer.InfoFormat("Closed The Session with Id {0}", SessionId);
}
catch (Exception ex)
{
Tracer.ErrorFormat("Error closing Session with id {0} : {1}", SessionId, ex);
}
}
}
internal void DoClose()
{
Shutdown();
}
internal void Shutdown()
{
//Tracer.InfoFormat("Executing Shutdown on Session with Id {0}", this.info.SessionId);
if (this.closed)
{
return;
}
lock (myLock)
{
if (this.closed || this.closing)
{
return;
}
try
{
this.closing = true;
// Stop all message deliveries from this Session
lock (consumers.SyncRoot)
{
foreach (MessageConsumer consumer in consumers.Values)
{
consumer.Close();
}
}
consumers.Clear();
lock (producers.SyncRoot)
{
foreach (MessageProducer producer in producers.Values)
{
producer.Close();
}
}
producers.Clear();
Connection.RemoveSession(this);
}
catch (Exception ex)
{
Tracer.ErrorFormat("Error closing Session with Id {0} : {1}", SessionId, ex);
}
finally
{
this.closed = true;
this.closing = false;
}
}
}
public IMessageProducer CreateProducer()
{
return CreateProducer(null);
}
public IMessageProducer CreateProducer(IDestination destination)
{
if (destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
MessageProducer producer = null;
try
{
Queue queue = new Queue(destination.ToString());
producer = DoCreateMessageProducer(queue);
this.AddProducer(producer);
}
catch (Exception)
{
if (producer != null)
{
this.RemoveProducer(producer.ProducerId);
producer.Close();
}
throw;
}
return producer;
}
internal virtual MessageProducer DoCreateMessageProducer(Destination destination)
{
return new MessageProducer(this, GetNextProducerId(), destination);
}
public IMessageConsumer CreateConsumer(IDestination destination)
{
return CreateConsumer(destination, null, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector)
{
return CreateConsumer(destination, selector, false);
}
public IMessageConsumer CreateConsumer(IDestination destination, string selector, bool noLocal)
{
if (destination == null)
{
throw new InvalidDestinationException("Cannot create a Consumer with a Null destination");
}
MessageConsumer consumer = null;
try
{
Queue queue = new Queue(destination.ToString());
consumer = DoCreateMessageConsumer(GetNextConsumerId(), queue, acknowledgementMode);
consumer.ConsumerTransformer = this.ConsumerTransformer;
this.AddConsumer(consumer);
if (this.Connection.IsStarted)
{
consumer.Start();
}
}
catch (Exception)
{
if (consumer != null)
{
this.RemoveConsumer(consumer);
consumer.Close();
}
throw;
}
return consumer;
}
public IMessageConsumer CreateDurableConsumer(ITopic destination, string name, string selector, bool noLocal)
{
throw new NotSupportedException("TODO: Durable Consumer");
}
internal virtual MessageConsumer DoCreateMessageConsumer(int id, Destination destination, AcknowledgementMode mode)
{
return new MessageConsumer(this, id, destination, mode);
}
public void DeleteDurableConsumer(string name)
{
throw new NotSupportedException("TODO: Durable Consumer");
}
public IQueueBrowser CreateBrowser(IQueue queue)
{
throw new NotImplementedException();
}
public IQueueBrowser CreateBrowser(IQueue queue, string selector)
{
throw new NotImplementedException();
}
public IQueue GetQueue(string name)
{
return new Queue(name);
}
public ITopic GetTopic(string name)
{
return new Topic(name);
}
public ITemporaryQueue CreateTemporaryQueue()
{
throw new NotSupportedException("TODO: Temp queue");
}
public ITemporaryTopic CreateTemporaryTopic()
{
throw new NotSupportedException("TODO: Temp topic");
}
/// <summary>
/// Delete a destination (Queue, Topic, Temp Queue, Temp Topic).
/// </summary>
public void DeleteDestination(IDestination destination)
{
// TODO: Implement if possible. If not possible, then change exception to NotSupportedException().
throw new NotImplementedException();
}
public IMessage CreateMessage()
{
BaseMessage answer = new BaseMessage();
return answer;
}
public ITextMessage CreateTextMessage()
{
TextMessage answer = new TextMessage();
return answer;
}
public ITextMessage CreateTextMessage(string text)
{
TextMessage answer = new TextMessage(text);
return answer;
}
public IMapMessage CreateMapMessage()
{
return new MapMessage();
}
public IBytesMessage CreateBytesMessage()
{
return new BytesMessage();
}
public IBytesMessage CreateBytesMessage(byte[] body)
{
BytesMessage answer = new BytesMessage();
answer.Content = body;
return answer;
}
public IStreamMessage CreateStreamMessage()
{
return new StreamMessage();
}
public IObjectMessage CreateObjectMessage(Object body)
{
ObjectMessage answer = new ObjectMessage();
answer.Body = body;
return answer;
}
public void Commit()
{
throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
}
public void Rollback()
{
throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
}
public void Recover()
{
throw new NotSupportedException("Transactions not supported by Qpid/Amqp");
}
// Properties
public Connection Connection
{
get { return connection; }
}
/// <summary>
/// The default timeout for network requests.
/// </summary>
public TimeSpan RequestTimeout
{
get { return NMSConstants.defaultRequestTimeout; }
set { }
}
public IMessageConverter MessageConverter
{
get { return messageConverter; }
set { messageConverter = value; }
}
public bool Transacted
{
get { return acknowledgementMode == AcknowledgementMode.Transactional; }
}
public AcknowledgementMode AcknowledgementMode
{
get { throw new NotImplementedException(); }
}
private ConsumerTransformerDelegate consumerTransformer;
public ConsumerTransformerDelegate ConsumerTransformer
{
get { return this.consumerTransformer; }
set { this.consumerTransformer = value; }
}
private ProducerTransformerDelegate producerTransformer;
public ProducerTransformerDelegate ProducerTransformer
{
get { return this.producerTransformer; }
set { this.producerTransformer = value; }
}
public void AddConsumer(MessageConsumer consumer)
{
if (!this.closing)
{
// Registered with Connection before we register at the broker.
consumers[consumer.ConsumerId] = consumer;
}
}
public void RemoveConsumer(MessageConsumer consumer)
{
if (!this.closing)
{
consumers.Remove(consumer.ConsumerId);
}
}
public void AddProducer(MessageProducer producer)
{
if (!this.closing)
{
this.producers[producer.ProducerId] = producer;
}
}
public void RemoveProducer(int objectId)
{
if (!this.closing)
{
producers.Remove(objectId);
}
}
public int GetNextConsumerId()
{
return Interlocked.Increment(ref consumerCounter);
}
public int GetNextProducerId()
{
return Interlocked.Increment(ref producerCounter);
}
public int SessionId
{
get { return id; }
}
public Org.Apache.Qpid.Messaging.Receiver CreateQpidReceiver(string address)
{
if (!IsStarted)
{
throw new SessionClosedException();
}
return qpidSession.CreateReceiver(address);
}
public Org.Apache.Qpid.Messaging.Sender CreateQpidSender(string address)
{
if (!IsStarted)
{
throw new SessionClosedException();
}
return qpidSession.CreateSender(address);
}
#region Transaction State Events
public event SessionTxEventDelegate TransactionStartedListener;
public event SessionTxEventDelegate TransactionCommittedListener;
public event SessionTxEventDelegate TransactionRolledBackListener;
#endregion
}
}