blob: 2deb79989bef3c8f386500e93fae2f06216cffaf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
namespace Apache.NMS.ActiveMQ.Threads
{
class PooledTaskRunner : TaskRunner
{
private readonly int maxIterationsPerRun;
private readonly Task task;
private readonly Object runable = new Object();
private bool queued;
private bool _shutdown;
private bool iterating;
private volatile System.Threading.Thread runningThread;
public void Run(Object o)
{
PooledTaskRunner p = o as PooledTaskRunner;
p.runningThread = System.Threading.Thread.CurrentThread;
try
{
p.RunTask();
}
finally
{
p.runningThread = null;
}
}
public PooledTaskRunner(Task task, int maxIterationsPerRun)
{
this.maxIterationsPerRun = maxIterationsPerRun;
this.task = task;
this._shutdown = false;
this.iterating = false;
this.queued = true;
ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
}
/// <summary>
/// We Expect MANY wakeup calls on the same TaskRunner.
/// </summary>
public void Wakeup()
{
lock(runable)
{
// When we get in here, we make some assumptions of state:
// queued=false, iterating=false: wakeup() has not be called and
// therefore task is not executing.
// queued=true, iterating=false: wakeup() was called but, task
// execution has not started yet
// queued=false, iterating=true : wakeup() was called, which caused
// task execution to start.
// queued=true, iterating=true : wakeup() called after task
// execution was started.
if(queued || _shutdown)
{
return;
}
queued = true;
// The runTask() method will do this for me once we are done
// iterating.
if(!iterating)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
}
}
}
/// <summary>
/// shut down the task
/// </summary>
/// <param name="timeout"></param>
public void Shutdown(TimeSpan timeout)
{
lock(runable)
{
_shutdown = true;
// the check on the thread is done
// because a call to iterate can result in
// shutDown() being called, which would wait forever
// waiting for iterating to finish
if(runningThread != System.Threading.Thread.CurrentThread)
{
if(iterating)
{
System.Threading.Thread.Sleep(timeout);
}
}
}
}
public void ShutdownWithAbort(TimeSpan timeout)
{
lock(runable)
{
_shutdown = true;
if(runningThread != System.Threading.Thread.CurrentThread)
{
if(iterating)
{
System.Threading.Thread.Sleep(timeout);
}
if(iterating)
{
runningThread.Abort();
}
}
}
}
public void Shutdown()
{
Shutdown(new TimeSpan(Timeout.Infinite));
}
internal void RunTask()
{
lock(runable)
{
queued = false;
if(_shutdown)
{
iterating = false;
return;
}
iterating = true;
}
// Don't synchronize while we are iterating so that
// multiple wakeup() calls can be executed concurrently.
bool done = false;
try
{
for(int i = 0; i < maxIterationsPerRun; i++)
{
if(!task.Iterate())
{
done = true;
break;
}
}
}
finally
{
lock(runable)
{
iterating = false;
if(_shutdown)
{
queued = false;
}
else
{
// If we could not iterate all the items
// then we need to re-queue.
if(!done)
{
queued = true;
}
if(queued)
{
ThreadPool.QueueUserWorkItem(new WaitCallback(Run), this);
}
}
}
}
}
}
}