blob: 66b6d3ce69793001ce00a838c425025798ec476b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <decaf/util/concurrent/ThreadPool.h>
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/util/Config.h>
#ifdef min
#undef min
#endif
#include <algorithm>
#include <iostream>
using namespace std;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
LOGDECAF_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
LOGDECAF_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")
////////////////////////////////////////////////////////////////////////////////
ThreadPool ThreadPool::instance;
////////////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool()
{
maxThreads = DEFAULT_MAX_POOL_SIZE;
blockSize = DEFAULT_MAX_BLOCK_SIZE;
freeThreads = 0;
shutdown = false;
}
////////////////////////////////////////////////////////////////////////////////
ThreadPool::~ThreadPool()
{
try
{
std::vector<PooledThread*>::iterator itr = pool.begin();
// Stop all the threads
for(; itr != pool.end(); ++itr)
{
(*itr)->stop();
}
// Set the shutdown flag so that the DeQueue methods all quit
// when we interrupt them.
shutdown = true;
synchronized(&queue)
{
// Signal the Queue so that all waiters are notified
queue.notifyAll();
}
// Wait for everyone to die
for(itr = pool.begin(); itr != pool.end(); ++itr)
{
(*itr)->join();
// Destroy the threads
delete *itr;
}
pool.clear();
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::queueTask( ThreadPool::Task task )
throw ( lang::Exception )
{
try
{
if( !task.first || !task.second )
{
throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
"ThreadPool::QueueTask - Invalid args for Task");
}
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");
synchronized(&queue)
{
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");
// If there's nobody open to do work, then create some more
// threads to handle the work.
if(freeThreads == 0)
{
AllocateThreads(blockSize);
}
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");
// queue the new work.
queue.push(task);
//LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");
// Inform waiters that we put some work on the queue.
queue.notify();
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
ThreadPool::Task ThreadPool::deQueueTask()
throw ( lang::Exception )
{
try
{
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");
synchronized(&queue)
{
/*LOGCMS_DEBUG(logger,
"ThreadPool::DeQueueTask - sync'd checking queue empty");*/
// Wait for work, wait in a while loop since another thread could
// be waiting for a lock and get the work before we get woken up
// from our wait.
while(queue.empty() && !shutdown)
{
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");
queue.wait();
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
}
// Don't give more work if we are closing down
if(shutdown)
{
return Task();
}
// check size again.
if(queue.empty())
{
throw lang::Exception( __FILE__, __LINE__,
"ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
}
//LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");
// not empty so get the new work to do
return queue.pop();
}
return Task();
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::reserve( std::size_t size )
{
try
{
synchronized(&poolLock)
{
if(size < pool.size() || pool.size() == maxThreads)
{
return;
}
// How many do we reserve
std::size_t allocCount = size - pool.size();
// Allocate the new Threads
AllocateThreads(allocCount);
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::setMaxThreads( std::size_t maxThreads )
{
try
{
synchronized(&poolLock)
{
if(maxThreads == 0)
{
// Caller tried to do something stupid, ignore them.
return;
}
this->maxThreads = maxThreads;
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::setBlockSize( std::size_t blockSize )
{
try
{
if(blockSize <= 0)
{
// User tried something dumb, protect them from themselves
return;
}
synchronized(&poolLock)
{
this->blockSize = blockSize;
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::AllocateThreads( std::size_t count )
{
try
{
if(pool.size() >= maxThreads)
{
return;
}
synchronized(&poolLock)
{
// Take the min of alloc size of maxThreads since we don't
// want anybody sneaking eaxtra threads in, greedy bastards.
count = std::min(count, maxThreads - pool.size());
// Each time we create a thread we increment the free Threads
// counter, but before we call start so that the Thread doesn't
// get ahead of us.
for(std::size_t i = 0; i < count; ++i)
{
pool.push_back(new PooledThread(this));
pool.back()->setPooledThreadListener(this);
freeThreads++;
pool.back()->start();
}
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskStarted( PooledThread* thread DECAF_UNUSED )
{
try
{
synchronized(&poolLock)
{
freeThreads--;
// Now that this callback has decremented the free threads coutner
// let check if there is any outstanding work to be done and no
// threads to handle it. This could happen if the QueueTask
// method was called successively without any of the PooledThreads
// having a chance to wake up and service the queue. This would
// cause the number of Task to exceed the number of free threads
// once the Threads got a chance to wake up and service the queue
if( freeThreads == 0 && !queue.empty() )
{
// Allocate a new block of threads
AllocateThreads( blockSize );
}
}
//LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskCompleted( PooledThread* thread DECAF_UNUSED)
{
try
{
synchronized(&poolLock)
{
freeThreads++;
}
//LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}
////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskException(
PooledThread* thread,
lang::Exception& ex DECAF_UNUSED )
{
//LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");
try
{
synchronized(&poolLock)
{
// Delete the thread that had the exception and start a new
// one to take its place.
freeThreads--;
std::vector<PooledThread*>::iterator itr =
std::find(pool.begin(), pool.end(), thread);
if(itr != pool.end())
{
pool.erase(itr);
}
// Bye-Bye Thread Object
delete thread;
// Now allocate a replacement
AllocateThreads(1);
}
}
DECAF_CATCH_RETHROW( lang::Exception )
DECAF_CATCHALL_THROW( lang::Exception )
}