/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#include <decaf/util/concurrent/ThreadPool.h>
#include <decaf/util/concurrent/Concurrent.h>
#include <decaf/lang/exceptions/IllegalArgumentException.h>
#include <decaf/util/Config.h>

#ifdef min
#undef min
#endif

#include <algorithm>
#include <iostream>

using namespace std;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;

////////////////////////////////////////////////////////////////////////////////
LOGDECAF_INITIALIZE(logger, ThreadPool, "com.activemq.concurrent.ThreadPool")
LOGDECAF_INITIALIZE(marker, ThreadPool, "com.activemq.concurrent.ThreadPool.Marker")

////////////////////////////////////////////////////////////////////////////////
ThreadPool ThreadPool::instance;

////////////////////////////////////////////////////////////////////////////////
ThreadPool::ThreadPool()
{
    maxThreads  = DEFAULT_MAX_POOL_SIZE;
    blockSize   = DEFAULT_MAX_BLOCK_SIZE;
    freeThreads = 0;

    shutdown = false;
}

////////////////////////////////////////////////////////////////////////////////
ThreadPool::~ThreadPool()
{
    try
    {
        std::vector<PooledThread*>::iterator itr = pool.begin();

        // Stop all the threads
        for(; itr != pool.end(); ++itr)
        {
            (*itr)->stop();
        }

        // Set the shutdown flag so that the DeQueue methods all quit
        // when we interrupt them.
        shutdown = true;

        synchronized(&queue)
        {
            // Signal the Queue so that all waiters are notified
            queue.notifyAll();
        }

        // Wait for everyone to die
        for(itr = pool.begin(); itr != pool.end(); ++itr)
        {
            (*itr)->join();

            // Destroy the threads
            delete *itr;
        }

        pool.clear();
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::queueTask( ThreadPool::Task task )
   throw ( lang::Exception )
{
    try
    {
        if( !task.first || !task.second )
        {
            throw exceptions::IllegalArgumentException( __FILE__, __LINE__,
                "ThreadPool::QueueTask - Invalid args for Task");
        }

        //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - syncing on queue");

        synchronized(&queue)
        {
            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - sync'd, synching pool");

            // If there's nobody open to do work, then create some more
            // threads to handle the work.
            if(freeThreads == 0)
            {
                AllocateThreads(blockSize);
            }

            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - pushing task");

            // queue the new work.
            queue.push(task);

            //LOGCMS_DEBUG(logger, "ThreadPool::QueueTask - calling notify");

            // Inform waiters that we put some work on the queue.
            queue.notify();
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
ThreadPool::Task ThreadPool::deQueueTask()
   throw ( lang::Exception )
{
    try
    {
        //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - syncing on queue");

        synchronized(&queue)
        {
            /*LOGCMS_DEBUG(logger,
                "ThreadPool::DeQueueTask - sync'd checking queue empty");*/

           // Wait for work, wait in a while loop since another thread could
           // be waiting for a lock and get the work before we get woken up
           // from our wait.
           while(queue.empty() && !shutdown)
           {
               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - Q empty, waiting");

               queue.wait();

               //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - done waiting");
           }

           // Don't give more work if we are closing down
           if(shutdown)
           {
               return Task();
           }

           // check size again.
           if(queue.empty())
           {
               throw lang::Exception( __FILE__, __LINE__,
                   "ThreadPool::DeQueueUserWorkItem - Empty Taskn, not in shutdown.");
           }

           //LOGCMS_DEBUG(logger, "ThreadPool::DeQueueTask - popping task");

           // not empty so get the new work to do
           return queue.pop();
        }

        return Task();
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::reserve( std::size_t size )
{
    try
    {
        synchronized(&poolLock)
        {
            if(size < pool.size() || pool.size() == maxThreads)
            {
                return;
            }

            // How many do we reserve
            std::size_t allocCount = size - pool.size();

            // Allocate the new Threads
            AllocateThreads(allocCount);
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::setMaxThreads( std::size_t maxThreads )
{
    try
    {
        synchronized(&poolLock)
        {
            if(maxThreads == 0)
            {
                // Caller tried to do something stupid, ignore them.
                return;
            }

            this->maxThreads = maxThreads;
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::setBlockSize( std::size_t blockSize )
{
    try
    {
        if(blockSize <= 0)
        {
            // User tried something dumb, protect them from themselves
            return;
        }

        synchronized(&poolLock)
        {
            this->blockSize = blockSize;
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::AllocateThreads( std::size_t count )
{
    try
    {
        if(pool.size() >= maxThreads)
        {
            return;
        }

        synchronized(&poolLock)
        {
            // Take the min of alloc size of maxThreads since we don't
            // want anybody sneaking eaxtra threads in, greedy bastards.
            count = std::min(count, maxThreads - pool.size());

            // Each time we create a thread we increment the free Threads
            // counter, but before we call start so that the Thread doesn't
            // get ahead of us.
            for(std::size_t i = 0; i < count; ++i)
            {
                pool.push_back(new PooledThread(this));
                pool.back()->setPooledThreadListener(this);
                freeThreads++;
                pool.back()->start();
            }
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskStarted( PooledThread* thread DECAF_UNUSED )
{
    try
    {
        synchronized(&poolLock)
        {
            freeThreads--;

            // Now that this callback has decremented the free threads coutner
            // let check if there is any outstanding work to be done and no
            // threads to handle it.  This could happen if the QueueTask
            // method was called successively without any of the PooledThreads
            // having a chance to wake up and service the queue.  This would
            // cause the number of Task to exceed the number of free threads
            // once the Threads got a chance to wake up and service the queue
            if( freeThreads == 0 && !queue.empty() )
            {
                // Allocate a new block of threads
                AllocateThreads( blockSize );
            }
        }

        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskStarted:");
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskCompleted( PooledThread* thread DECAF_UNUSED)
{
    try
    {
        synchronized(&poolLock)
        {
            freeThreads++;
        }

        //LOGCMS_DEBUG(logger, "ThreadPool::onTaskCompleted: ");
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

////////////////////////////////////////////////////////////////////////////////
void ThreadPool::onTaskException(
   PooledThread* thread,
   lang::Exception& ex DECAF_UNUSED )
{
    //LOGCMS_DEBUG(logger, "ThreadPool::onTaskException: ");

    try
    {
        synchronized(&poolLock)
        {
            // Delete the thread that had the exception and start a new
            // one to take its place.
            freeThreads--;

            std::vector<PooledThread*>::iterator itr =
                std::find(pool.begin(), pool.end(), thread);

            if(itr != pool.end())
            {
                pool.erase(itr);
            }

            // Bye-Bye Thread Object
            delete thread;

            // Now allocate a replacement
            AllocateThreads(1);
        }
    }
    DECAF_CATCH_RETHROW( lang::Exception )
    DECAF_CATCHALL_THROW( lang::Exception )
}

