| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Collections; |
| using Apache.NMS.Stomp.Commands; |
| using Apache.NMS.Stomp.Util; |
| using Apache.NMS.Stomp.Threads; |
| |
| namespace Apache.NMS.Stomp |
| { |
| public class SessionExecutor : Threads.Task |
| { |
| private MessageDispatchChannel messageQueue = new MessageDispatchChannel(); |
| private TaskRunner taskRunner = null; |
| |
| private Session session = null; |
| private IDictionary consumers = null; |
| |
| public SessionExecutor(Session session, IDictionary consumers) |
| { |
| this.session = session; |
| this.consumers = consumers; |
| } |
| |
| ~SessionExecutor() |
| { |
| try |
| { |
| Stop(); |
| Close(); |
| Clear(); |
| } |
| catch |
| { |
| } |
| } |
| |
| public void Execute(MessageDispatch dispatch) |
| { |
| // Add the data to the queue. |
| this.messageQueue.Enqueue(dispatch); |
| this.Wakeup(); |
| } |
| |
| public void ExecuteFirst(MessageDispatch dispatch) |
| { |
| // Add the data to the queue. |
| this.messageQueue.EnqueueFirst(dispatch); |
| this.Wakeup(); |
| } |
| |
| public void Wakeup() |
| { |
| TaskRunner taskRunner = this.taskRunner; |
| |
| lock(messageQueue.SyncRoot) |
| { |
| if(this.taskRunner == null) |
| { |
| this.taskRunner = new DedicatedTaskRunner(this); |
| } |
| |
| taskRunner = this.taskRunner; |
| } |
| |
| taskRunner.Wakeup(); |
| } |
| |
| public void Start() |
| { |
| if(!messageQueue.Running) |
| { |
| messageQueue.Start(); |
| |
| if(HasUncomsumedMessages) |
| { |
| this.Wakeup(); |
| } |
| } |
| } |
| |
| public void Stop() |
| { |
| if(messageQueue.Running) |
| { |
| messageQueue.Stop(); |
| TaskRunner taskRunner = this.taskRunner; |
| |
| if(taskRunner != null) |
| { |
| this.taskRunner = null; |
| taskRunner.Shutdown(); |
| } |
| } |
| } |
| |
| public void Close() |
| { |
| this.messageQueue.Close(); |
| } |
| |
| public void Dispatch(MessageDispatch dispatch) |
| { |
| try |
| { |
| MessageConsumer consumer = null; |
| |
| lock(this.consumers.SyncRoot) |
| { |
| if(this.consumers.Contains(dispatch.ConsumerId)) |
| { |
| consumer = this.consumers[dispatch.ConsumerId] as MessageConsumer; |
| } |
| |
| // If the consumer is not available, just ignore the message. |
| // Otherwise, dispatch the message to the consumer. |
| if(consumer != null) |
| { |
| consumer.Dispatch(dispatch); |
| } |
| } |
| |
| } |
| catch(Exception ex) |
| { |
| Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message ); |
| } |
| } |
| |
| public bool Iterate() |
| { |
| try |
| { |
| lock(this.consumers.SyncRoot) |
| { |
| // Deliver any messages queued on the consumer to their listeners. |
| foreach( MessageConsumer consumer in this.consumers.Values ) |
| { |
| if(consumer.Iterate()) |
| { |
| return true; |
| } |
| } |
| } |
| |
| // No messages left queued on the listeners.. so now dispatch messages |
| // queued on the session |
| MessageDispatch message = messageQueue.DequeueNoWait(); |
| |
| if(message != null) |
| { |
| this.Dispatch(message); |
| return !messageQueue.Empty; |
| } |
| |
| return false; |
| } |
| catch(Exception ex) |
| { |
| Tracer.DebugFormat("Caught Exception While Dispatching: {0}", ex.Message ); |
| this.session.Connection.OnSessionException(this.session, ex); |
| } |
| |
| return true; |
| } |
| |
| public void ClearMessagesInProgress() |
| { |
| this.messageQueue.Clear(); |
| } |
| |
| public void Clear() |
| { |
| this.messageQueue.Clear(); |
| } |
| |
| public MessageDispatch[] UnconsumedMessages |
| { |
| get{ return messageQueue.RemoveAll(); } |
| } |
| |
| public bool HasUncomsumedMessages |
| { |
| get{ return !messageQueue.Closed && messageQueue.Running && !messageQueue.Empty; } |
| } |
| |
| public bool Running |
| { |
| get{ return this.messageQueue.Running; } |
| } |
| |
| public bool Empty |
| { |
| get{ return this.messageQueue.Empty; } |
| } |
| |
| } |
| } |