blob: 3db9c00c744d46a0f5e53ab0cbf08ef00cf15186 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
namespace Apache.NMS.ActiveMQ.Threads
{
/// <summary>
/// This class provides a wrapper around the ThreadPool mechanism in .NET
/// to allow for serial execution of jobs in the ThreadPool and provide
/// a means of shutting down the execution of jobs in a deterministic
/// way.
/// </summary>
public class ThreadPoolExecutor
{
private Queue<Future> workQueue = new Queue<Future>();
private Mutex syncRoot = new Mutex();
private bool running = false;
private bool closing = false;
private bool closed = false;
private ManualResetEvent executionComplete = new ManualResetEvent(true);
private Thread workThread = null;
/// <summary>
/// Represents an asynchronous task that is executed on the ThreadPool
/// at some point in the future.
/// </summary>
internal class Future
{
private readonly WaitCallback callback;
private readonly object callbackArg;
public Future(WaitCallback callback, object arg)
{
this.callback = callback;
this.callbackArg = arg;
}
public void Run()
{
if(this.callback == null)
{
throw new Exception("Future executed with null WaitCallback");
}
try
{
this.callback(callbackArg);
}
catch
{
}
}
}
public void QueueUserWorkItem(WaitCallback worker)
{
this.QueueUserWorkItem(worker, null);
}
public void QueueUserWorkItem(WaitCallback worker, object arg)
{
if(worker == null)
{
throw new ArgumentNullException("Invalid WaitCallback passed");
}
if(!this.closed)
{
lock(syncRoot)
{
if(!this.closed || !this.closing)
{
this.workQueue.Enqueue(new Future(worker, arg));
if(!this.running)
{
this.executionComplete.Reset();
this.running = true;
ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
}
}
}
}
}
/// <summary>
/// Returns true if this ThreadPoolExecutor has been shut down but has not
/// finished running all the tasks that have been Queue. When a ThreadPoolExecutor
/// is shut down it will not accept any new tasks but it will complete all tasks
/// that have been previously queued.
/// </summary>
public bool IsShutdown
{
get { return this.closing; }
}
/// <summary>
/// Returns true if this ThreadPoolExecutor has been shut down and has also
/// completed processing of all outstanding tasks in its task Queue.
/// </summary>
public bool IsTerminated
{
get { return this.closed; }
}
public void Shutdown()
{
if(!this.closed)
{
lock(this.syncRoot)
{
if(!this.closed)
{
this.closing = true;
// Must be no tasks in Queue and none can be accepted
// now that we've flipped the closing toggle so safe to
// mark this ThreadPoolExecutor as closed.
if (!this.running)
{
this.closed = true;
this.executionComplete.Set();
}
}
}
}
}
public bool AwaitTermination(TimeSpan timeout)
{
if(!this.closed)
{
syncRoot.WaitOne();
if(!this.closed)
{
// If called from the worker thread we can't check this as it
// will deadlock us, just return whatever the closed state is.
if(this.running && Thread.CurrentThread != this.workThread)
{
syncRoot.ReleaseMutex();
this.closed = this.executionComplete.WaitOne(timeout, false);
syncRoot.WaitOne();
}
}
syncRoot.ReleaseMutex();
}
return this.closed;
}
private void QueueProcessor(object unused)
{
Future theTask = null;
lock(syncRoot)
{
this.workThread = Thread.CurrentThread;
if(this.workQueue.Count == 0)
{
this.running = false;
this.executionComplete.Set();
return;
}
theTask = this.workQueue.Dequeue();
}
try
{
theTask.Run();
}
finally
{
this.workThread = null;
if(this.workQueue.Count == 0)
{
lock(syncRoot)
{
this.running = false;
this.executionComplete.Set();
}
}
else
{
ThreadPool.QueueUserWorkItem(new WaitCallback(QueueProcessor), null);
}
}
}
}
}