blob: 544885c6592a8892c2c0c46903d801f599e789a5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections;
using Apache.NMS.ActiveMQ.Commands;
using Apache.NMS.ActiveMQ.Util;
using Apache.NMS.ActiveMQ.Threads;
namespace Apache.NMS.ActiveMQ
{
public class SessionExecutor : Threads.Task
{
private readonly MessageDispatchChannel messageQueue = null;
private TaskRunner taskRunner = null;
private readonly Session session = null;
private readonly IDictionary consumers = null;
public SessionExecutor(Session session, IDictionary consumers)
{
this.session = session;
this.consumers = consumers;
if(this.session.Connection != null && this.session.Connection.MessagePrioritySupported)
{
this.messageQueue = new SimplePriorityMessageDispatchChannel();
}
else
{
this.messageQueue = new FifoMessageDispatchChannel();
}
}
~SessionExecutor()
{
try
{
Stop();
Close();
Clear();
}
catch
{
}
}
public void Execute(MessageDispatch dispatch)
{
// Add the data to the queue.
this.messageQueue.Enqueue(dispatch);
this.Wakeup();
}
public void ExecuteFirst(MessageDispatch dispatch)
{
// Add the data to the queue.
this.messageQueue.EnqueueFirst(dispatch);
this.Wakeup();
}
public void Wakeup()
{
TaskRunner taskRunner = this.taskRunner;
lock(messageQueue.SyncRoot)
{
if(this.taskRunner == null)
{
this.taskRunner = DefaultThreadPools.DefaultTaskRunnerFactory.CreateTaskRunner(this);
}
taskRunner = this.taskRunner;
}
taskRunner.Wakeup();
}
public void Start()
{
if(!messageQueue.Running)
{
messageQueue.Start();
if(HasUncomsumedMessages)
{
this.Wakeup();
}
}
}
public void Stop()
{
if(messageQueue.Running)
{
messageQueue.Stop();
TaskRunner taskRunner = this.taskRunner;
if(taskRunner != null)
{
this.taskRunner = null;
taskRunner.Shutdown();
}
}
}
public void Stop(TimeSpan timeout)
{
if(messageQueue.Running)
{
messageQueue.Stop();
TaskRunner taskRunner = this.taskRunner;
if(taskRunner != null)
{
this.taskRunner = null;
taskRunner.ShutdownWithAbort(timeout);
}
}
}
public void Close()
{
this.messageQueue.Close();
}
public void Dispatch(MessageDispatch dispatch)
{
try
{
MessageConsumer consumer = null;
lock(this.consumers.SyncRoot)
{
if(this.consumers.Contains(dispatch.ConsumerId))
{
consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer;
}
}
// If the consumer is not available, just ignore the message.
// Otherwise, dispatch the message to the consumer.
if(consumer != null)
{
consumer.Dispatch(dispatch);
}
}
catch(Exception ex)
{
Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
}
}
public bool Iterate()
{
try
{
lock(this.consumers.SyncRoot)
{
// Deliver any messages queued on the consumer to their listeners.
foreach( MessageConsumer consumer in this.consumers.Values )
{
if(consumer.Iterate())
{
return true;
}
}
}
// No messages left queued on the listeners.. so now dispatch messages
// queued on the session
MessageDispatch message = messageQueue.DequeueNoWait();
if(message != null)
{
this.Dispatch(message);
return !messageQueue.Empty;
}
return false;
}
catch(Exception ex)
{
Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message );
this.session.Connection.OnSessionException(this.session, ex);
}
return true;
}
public void ClearMessagesInProgress()
{
this.messageQueue.Clear();
}
public void Clear()
{
this.messageQueue.Clear();
}
public MessageDispatch[] UnconsumedMessages
{
get{ return messageQueue.RemoveAll(); }
}
public bool HasUncomsumedMessages
{
get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; }
}
public bool Running
{
get{ return this.messageQueue.Running; }
}
public bool Empty
{
get{ return this.messageQueue.Empty; }
}
}
}