blob: 456c7eb30707cda69378f0870cbd0b08d6ced7eb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <exception>
#include <iostream>
#include <ignite/common/thread_pool.h>
namespace ignite
{
namespace common
{
ThreadPool::ThreadPool(uint32_t threadsNum):
started(false),
stopped(false),
threads()
{
uint32_t threadToStart = threadsNum != 0 ? threadsNum : concurrent::GetNumberOfProcessors();
if (!threadToStart)
threadToStart = 2;
threads.reserve(threadToStart);
for (uint32_t i = 0; i < threadToStart; ++i)
{
SP_WorkerThread thread(new WorkerThread(queue));
threads.push_back(thread);
}
}
ThreadPool::~ThreadPool()
{
Stop();
}
void ThreadPool::Start()
{
concurrent::CsLockGuard guard(mutex);
if (started)
return;
started = true;
for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
it->Get()->Start();
}
void ThreadPool::Stop()
{
concurrent::CsLockGuard guard(mutex);
if (stopped || !started)
return;
stopped = true;
queue.Unblock();
for (std::vector<SP_WorkerThread>::iterator it = threads.begin(); it != threads.end(); ++it)
it->Get()->Join();
}
void ThreadPool::Dispatch(const SP_ThreadPoolTask& task)
{
queue.Push(task);
}
ThreadPool::TaskQueue::TaskQueue() :
unblocked(false)
{
// No-op.
}
ThreadPool::TaskQueue::~TaskQueue()
{
// No-op.
}
void ThreadPool::TaskQueue::Push(const SP_ThreadPoolTask &task)
{
if (!task.IsValid())
return;
concurrent::CsLockGuard guard(mutex);
if (unblocked)
{
IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
HandleTaskError(*task.Get(), err);
return;
}
tasks.push_back(task);
waitPoint.NotifyOne();
}
SP_ThreadPoolTask ThreadPool::TaskQueue::Pop()
{
concurrent::CsLockGuard guard(mutex);
if (unblocked)
return SP_ThreadPoolTask();
while (tasks.empty())
{
waitPoint.Wait(mutex);
if (unblocked)
return SP_ThreadPoolTask();
}
SP_ThreadPoolTask res = tasks.front();
tasks.pop_front();
return res;
}
void ThreadPool::TaskQueue::Unblock()
{
concurrent::CsLockGuard guard(mutex);
unblocked = true;
IgniteError err(IgniteError::IGNITE_ERR_GENERIC, "Execution thread pool is stopped");
for (std::deque< SP_ThreadPoolTask >::iterator it = tasks.begin(); it != tasks.end(); ++it)
HandleTaskError(*it->Get(), err);
tasks.clear();
waitPoint.NotifyAll();
}
ThreadPool::WorkerThread::WorkerThread(TaskQueue& taskQueue) :
taskQueue(taskQueue)
{
// No-op.
}
ThreadPool::WorkerThread::~WorkerThread()
{
// No-op.
}
void ThreadPool::WorkerThread::Run()
{
while (true)
{
SP_ThreadPoolTask task = taskQueue.Pop();
// Queue is unblocked and workers should stop.
if (!task.IsValid())
break;
ThreadPoolTask &task0 = *task.Get();
try
{
task0.Execute();
}
catch (const IgniteError& err)
{
HandleTaskError(task0, err);
}
catch (const std::exception& err)
{
IgniteError err0(IgniteError::IGNITE_ERR_STD, err.what());
HandleTaskError(task0, err0);
}
catch (...)
{
IgniteError err(IgniteError::IGNITE_ERR_UNKNOWN, "Unknown error occurred when executing task");
HandleTaskError(task0, err);
}
}
}
void ThreadPool::HandleTaskError(ThreadPoolTask &task, const IgniteError &err)
{
try
{
task.OnError(err);
return;
}
catch (const IgniteError& err0)
{
std::cerr << "Exception is thrown during handling of exception: "
<< err0.what() << "Aborting execution" << std::endl;
}
catch (const std::exception& err0)
{
std::cerr << "Exception is thrown during handling of exception: "
<< err0.what() << "Aborting execution" << std::endl;
}
catch (...)
{
std::cerr << "Unknown exception is thrown during handling of exception. Aborting execution" << std::endl;
}
std::terminate();
}
}
}