blob: ab48db2921db741fb8d30e4ba516001b0111a941 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Threading.h"
#include <decaf/lang/Thread.h>
#include <decaf/lang/System.h>
#include <decaf/lang/Integer.h>
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/util/concurrent/Executors.h>
#include <decaf/internal/util/concurrent/ThreadLocalImpl.h>
#include <decaf/internal/util/concurrent/ThreadingTypes.h>
#include <decaf/internal/util/concurrent/PlatformThread.h>
#include <decaf/internal/util/concurrent/Atomics.h>
#include <decaf/util/concurrent/atomic/AtomicInteger.h>
#include <vector>
#include <list>
#ifdef __SUNPRO_CC
#include <stdlib.h>
#endif
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::util::concurrent::atomic;
using namespace decaf::internal;
using namespace decaf::internal::util;
using namespace decaf::internal::util::concurrent;
////////////////////////////////////////////////////////////////////////////////
namespace {
class SuspendedCompletionCondition : public CompletionCondition {
private:
ThreadHandle* thread;
private:
SuspendedCompletionCondition(const SuspendedCompletionCondition&);
SuspendedCompletionCondition& operator= (const SuspendedCompletionCondition&);
public:
SuspendedCompletionCondition(ThreadHandle* thread) : thread(thread) {}
virtual bool operator()() {
return !thread->suspended;
}
using CompletionCondition::operator();
};
class MonitorWaitCompletionCondition : public CompletionCondition {
private:
ThreadHandle* handle;
private:
MonitorWaitCompletionCondition(const MonitorWaitCompletionCondition&);
MonitorWaitCompletionCondition& operator= (const MonitorWaitCompletionCondition&);
public:
MonitorWaitCompletionCondition(ThreadHandle* handle) : handle(handle) {}
virtual bool operator()(bool timedOut) {
PlatformThread::lockMutex(handle->mutex);
if (handle->notified || (handle->interruptible && handle->interrupted)) {
return true;
}
if (!timedOut) {
// Not timed out and not complete so unlock the thread so others can
// notify or interrupt it.
PlatformThread::unlockMutex(handle->mutex);
}
return false;
}
virtual bool operator()() {
PlatformThread::lockMutex(handle->mutex);
if (handle->notified || (handle->interruptible && handle->interrupted)) {
return true;
}
PlatformThread::unlockMutex(handle->mutex);
return false;
}
};
struct MonitorPool {
MonitorHandle* head;
unsigned int count;
};
struct ThreadingLibrary {
private:
ThreadingLibrary(const ThreadingLibrary&);
ThreadingLibrary& operator= (const ThreadingLibrary&);
public:
ThreadingLibrary() : threadKey(),
selfKey(),
globalLock(),
tlsLock(),
tlsSlots(),
osThreads(),
mainThread(),
activeThreads(),
priorityMapping(),
osThreadId(),
monitors() {
}
decaf_tls_key threadKey;
decaf_tls_key selfKey;
decaf_mutex_t globalLock;
decaf_mutex_t tlsLock;
std::vector<ThreadLocalImpl*> tlsSlots;
std::vector<Thread*> osThreads;
decaf_thread_t mainThread;
std::list<ThreadHandle*> activeThreads;
std::vector<int> priorityMapping;
AtomicInteger osThreadId;
MonitorPool* monitors;
};
#define MONITOR_POOL_BLOCK_SIZE 64
ThreadingLibrary* library = NULL;
// ------------------------ Forward Declare All Utility Methds ----------------------- //
void threadExitTlsCleanup(ThreadHandle* thread);
void unblockThreads(ThreadHandle* monitor);
void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
bool suspended, threadingTask threadMain, void* threadArg);
void dereferenceThread(ThreadHandle* thread);
ThreadHandle* initThreadHandle(ThreadHandle* thread);
MonitorHandle* initMonitorHandle(MonitorHandle* monitor);
bool interruptWaitingThread(ThreadHandle* self, ThreadHandle* target);
void unblockThreads(ThreadHandle* queueHead);
void enqueueThread(ThreadHandle** queue, ThreadHandle* thread);
void dequeueThread(ThreadHandle** queue, ThreadHandle* thread);
unsigned int getNumberOfWaiters(MonitorHandle* monitor);
void purgeMonitorsPool(MonitorPool* pool);
MonitorHandle* batchAllocateMonitors();
void doMonitorExit(MonitorHandle* monitor, ThreadHandle* thread);
void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread);
void doNotifyWaiters(MonitorHandle* monitor, bool notifyAll);
void doNotifyThread(ThreadHandle* thread, bool markAsNotified);
bool doWaitOnMonitor(MonitorHandle* monitor, ThreadHandle* thread, long long mills, int nanos, bool interruptible);
// ------------------------ Forward Declare All Utility Methds ----------------------- //
void threadExit(ThreadHandle* self, bool destroy = false) {
PlatformThread::lockMutex(library->globalLock);
PlatformThread::lockMutex(self->mutex);
self->state = Thread::TERMINATED;
// Must ensure that any interrupting threads get released
if (self->interruptingThread) {
PlatformThread::lockMutex(self->interruptingThread->mutex);
self->interruptingThread->canceled = true;
PlatformThread::unlockMutex(self->interruptingThread->mutex);
self->interruptingThread = NULL;
}
decaf_thread_t handle = self->handle;
// Wake up any blocked threads
PlatformThread::notifyAll(self->condition);
unblockThreads(self->joiners);
PlatformThread::setTlsValue(library->threadKey, NULL);
PlatformThread::setTlsValue(library->selfKey, NULL);
// Ensure all of this thread's local values are purged.
threadExitTlsCleanup(self);
// Remove from the set of active threads under global lock, threads that
// are iterating on global state need a stable list.
library->activeThreads.remove(self);
PlatformThread::unlockMutex(self->mutex);
PlatformThread::unlockMutex(library->globalLock);
if (destroy == true) {
free(self->name);
PlatformThread::destroyMutex(self->mutex);
PlatformThread::destroyCondition(self->condition);
delete self;
} else {
dereferenceThread(self);
}
PlatformThread::detachThread(handle);
PlatformThread::exitThread();
}
PLATFORM_THREAD_CALLBACK_TYPE PLATFORM_CALLING_CONV threadEntryMethod(PLATFORM_THREAD_ENTRY_ARG arg) {
ThreadHandle* thread = (ThreadHandle*)arg;
PlatformThread::setTlsValue(library->threadKey, thread->parent);
PlatformThread::setTlsValue(library->selfKey, thread);
PlatformThread::lockMutex(thread->mutex);
if (thread->suspended == true) {
SuspendedCompletionCondition completion(thread);
PlatformThread::interruptibleWaitOnCondition(thread->condition, thread->mutex, completion);
}
PlatformThread::unlockMutex(thread->mutex);
if (thread->canceled == true) {
threadExit(thread);
PLATFORM_THREAD_RETURN()
}
PlatformThread::lockMutex(library->globalLock);
library->activeThreads.push_back(thread);
PlatformThread::unlockMutex(library->globalLock);
thread->state = Thread::RUNNABLE;
thread->threadMain(thread->threadArg);
threadExit(thread);
PLATFORM_THREAD_RETURN()
}
void runCallback(void* arg) {
ThreadHandle* thread = (ThreadHandle*)arg;
// Invoke run on the task.
try {
thread->parent->run();
} catch( decaf::lang::Throwable& error ){
if (thread->parent->getUncaughtExceptionHandler() != NULL) {
thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
} else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
}
} catch(std::exception& stdEx) {
const RuntimeException error(__FILE__, __LINE__, stdEx.what());
if (thread->parent->getUncaughtExceptionHandler() != NULL) {
thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
} else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
}
} catch(...) {
const RuntimeException error(__FILE__, __LINE__,
"Uncaught exception bubbled up to Thread::run, Thread Terminating.");
if (thread->parent->getUncaughtExceptionHandler() != NULL) {
thread->parent->getUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
} else if (thread->parent->getDefaultUncaughtExceptionHandler() != NULL) {
thread->parent->getDefaultUncaughtExceptionHandler()->uncaughtException(thread->parent, error);
}
}
}
void interruptionThread(void *arg) {
ThreadHandle* self = Threading::getCurrentThreadHandle();
ThreadHandle* target = (ThreadHandle*)arg;
MonitorHandle* monitor = NULL;
PlatformThread::lockMutex(library->globalLock);
// If the target was already canceled then we are done.
if (self->canceled == true) {
PlatformThread::unlockMutex(library->globalLock);
threadExit(self, true);
}
PlatformThread::lockMutex(target->mutex);
if (target->interruptingThread != self) {
PlatformThread::unlockMutex(target->mutex);
PlatformThread::unlockMutex(library->globalLock);
threadExit(self, true);
}
// This is the monitor the target is waiting on.
monitor = target->monitor;
PlatformThread::unlockMutex(target->mutex);
PlatformThread::unlockMutex(library->globalLock);
// try to take the monitor so that we can notify the thread to interrupt.
doMonitorEnter(monitor, self);
PlatformThread::lockMutex(library->globalLock);
// If the target was interrupted already it will cancel this thread.
if (self->canceled == true) {
PlatformThread::unlockMutex(library->globalLock);
threadExit(self, true);
}
PlatformThread::lockMutex(target->mutex);
if (target->interruptingThread == self && target->waiting == true) {
doNotifyThread(target, false);
}
target->interruptingThread = NULL;
PlatformThread::unlockMutex(target->mutex);
PlatformThread::unlockMutex(library->globalLock);
Threading::exitMonitor(monitor);
threadExit(self, true);
}
void createThreadInstance(ThreadHandle* thread, long long stackSize, int priority,
bool suspended, threadingTask threadMain, void* threadArg) {
thread->stackSize = stackSize;
thread->priority = priority;
thread->suspended = suspended;
thread->threadMain = threadMain;
thread->threadArg = threadArg;
PlatformThread::createNewThread(&thread->handle, threadEntryMethod, thread,
library->priorityMapping[priority], stackSize,
&thread->threadId);
}
ThreadHandle* initThreadHandle(ThreadHandle* thread) {
thread->parent = NULL;
thread->name = NULL;
thread->interruptible = false;
thread->interrupted = false;
thread->parked = false;
thread->priority = Thread::NORM_PRIORITY;
thread->stackSize = -1;
thread->state = Thread::NEW;
thread->references = 2;
thread->unparked = false;
thread->numAttached = 0;
thread->interruptingThread = NULL;
thread->osThread = false;
thread->handle = PlatformThread::getCurrentThread();
thread->threadId = 0;
thread->next = NULL;
thread->joiners = NULL;
thread->interruptingThread = NULL;
thread->monitor = NULL;
::memset(thread->tls, 0, sizeof(thread->tls));
try {
PlatformThread::createMutex(&thread->mutex);
} catch(RuntimeException& ex) {
throw ex;
}
try {
PlatformThread::createCondition(&thread->condition);
} catch(RuntimeException& ex) {
PlatformThread::destroyMutex(thread->mutex);
throw ex;
}
return thread;
}
void dereferenceThread(ThreadHandle* thread) {
// Both the Thread class and the thread hold a reference to the thread
// kernel, so one or the other must delete it when both are finished.
if (Atomics::decrementAndGet(&(thread->references)) <= 0) {
free(thread->name);
PlatformThread::destroyMutex(thread->mutex);
PlatformThread::destroyCondition(thread->condition);
PlatformThread::detachOSThread(thread->handle);
delete thread;
}
}
MonitorHandle* initMonitorHandle(MonitorHandle* monitor) {
monitor->owner = NULL;
monitor->count = 0;
monitor->blocking = NULL;
monitor->waiting = NULL;
monitor->next = NULL;
return monitor;
}
bool interruptWaitingThread(ThreadHandle* self DECAF_UNUSED, ThreadHandle* target) {
bool result = false;
//MonitorHandle* monitor;
// TODO - Currently gets into a deadlock.
// If this thread owns the target thread's monitor lock then there's no
// reason to spawn an interruption thread, otherwise it has to be done
// asynchronously.
//monitor = target->monitor;
// if (monitorTryEnterUsingThreadId(monitor, self) == true) {
// PlatformThread::notifyAll(target->condition);
// monitorExitUsingThreadId(monitor, self);
// result = true;
// } else {
// Spawn the thread so that we don't deadlock on the monitor.
target->interruptingThread = initThreadHandle(new ThreadHandle());
createThreadInstance(target->interruptingThread, 0, Thread::NORM_PRIORITY, false, interruptionThread, target);
return result;
}
void threadExitTlsCleanup(ThreadHandle* thread) {
for (int index = 0; index < DECAF_MAX_TLS_SLOTS; ++index) {
if (thread->tls[index] != NULL) {
ThreadLocalImpl* handler = NULL;
void *value = NULL;
PlatformThread::lockMutex(library->tlsLock);
value = thread->tls[index];
handler = library->tlsSlots[index];
PlatformThread::unlockMutex(library->tlsLock);
if (value != NULL) {
handler->doDelete(value);
}
}
}
}
void unblockThreads(ThreadHandle* queueHead) {
ThreadHandle* current = NULL;
ThreadHandle* next = NULL;
next = queueHead;
while (next != NULL) {
current = next;
next = current->next;
PlatformThread::notifyAll(current->condition);
}
}
void enqueueThread(ThreadHandle** queue, ThreadHandle* thread) {
ThreadHandle* qThread = *queue;
if (thread->next != NULL) {
throw RuntimeException(__FILE__, __LINE__, "Thread was on a monitor queue already");
}
if (qThread != NULL) {
while(qThread->next) {
qThread = qThread->next;
}
qThread->next = thread;
} else {
*queue = thread;
}
}
void dequeueThread(ThreadHandle** queue, ThreadHandle* thread) {
ThreadHandle* current = NULL;
ThreadHandle* next = NULL;
if ((current = *queue) == NULL) {
return;
}
if (current == thread) {
*queue = thread->next;
thread->next = NULL;
} else {
while((next = current->next) != NULL && next != thread) {
current = next;
}
if (next != NULL) {
current->next = thread->next;
thread->next = NULL;
}
}
}
unsigned int getNumberOfWaiters(MonitorHandle* monitor) {
unsigned int numWaiting = 0;
ThreadHandle* current;
PlatformThread::lockMutex(monitor->mutex);
current = monitor->waiting;
while (current != NULL) {
numWaiting++;
current = current->next;
}
PlatformThread::unlockMutex(monitor->mutex);
return numWaiting;
}
void purgeMonitorsPool(MonitorPool* pool) {
MonitorHandle* current = NULL;
MonitorHandle* next = NULL;
next = pool->head;
while (next != NULL) {
current = next;
next = current->next;
// Cleanup the OS level resources.
if (current->initialized == true) {
PlatformThread::destroyMutex(current->mutex);
PlatformThread::destroyMutex(current->lock);
}
delete current;
}
}
MonitorHandle* batchAllocateMonitors() {
MonitorHandle* current = NULL;
MonitorHandle* last = NULL;
for (int i = 0; i < MONITOR_POOL_BLOCK_SIZE; ++i) {
current = new MonitorHandle;
initMonitorHandle(current);
current->next = last;
current->initialized = false;
last = current;
}
return current;
}
void doNotifyThread(ThreadHandle* thread, bool markAsNotified) {
thread->waiting = false;
thread->blocked = true;
if (markAsNotified) {
thread->notified = true;
}
PlatformThread::notifyAll(thread->condition);
}
void doNotifyWaiters(MonitorHandle* monitor, bool notifyAll) {
ThreadHandle* self = Threading::getCurrentThreadHandle();
ThreadHandle* current = NULL;
ThreadHandle* next = NULL;
bool signalled = false;
if (self != monitor->owner) {
throw IllegalMonitorStateException(__FILE__, __LINE__, "Current Thread is not the lock holder.");
}
PlatformThread::lockMutex(monitor->mutex);
next = monitor->waiting;
while (next != NULL) {
current = next;
next = current->next;
PlatformThread::lockMutex(current->mutex);
if (current->waiting == true) {
doNotifyThread(current, true);
signalled = true;
}
PlatformThread::unlockMutex(current->mutex);
if (signalled && !notifyAll) {
break;
}
}
PlatformThread::unlockMutex(monitor->mutex);
}
void doMonitorEnter(MonitorHandle* monitor, ThreadHandle* thread) {
while (true) {
if (PlatformThread::tryLockMutex(monitor->lock) == true) {
monitor->owner = thread;
monitor->count = 1;
break;
}
PlatformThread::lockMutex(monitor->mutex);
if (PlatformThread::tryLockMutex(monitor->lock) == true) {
PlatformThread::unlockMutex(monitor->mutex);
monitor->owner = thread;
monitor->count = 1;
break;
}
PlatformThread::lockMutex(thread->mutex);
thread->blocked = true;
thread->state = Thread::BLOCKED;
thread->monitor = monitor;
PlatformThread::unlockMutex(thread->mutex);
enqueueThread(&monitor->blocking, thread);
PlatformThread::waitOnCondition(thread->condition, monitor->mutex);
dequeueThread(&monitor->blocking, thread);
PlatformThread::unlockMutex(monitor->mutex);
}
// Monitor is now owned by this thread, lets clean up the state in case
// the lock was acquired after blocking.
if (thread->monitor != NULL) {
PlatformThread::lockMutex(thread->mutex);
thread->blocked = false;
thread->state = Thread::RUNNABLE;
thread->monitor = NULL;
PlatformThread::unlockMutex(thread->mutex);
}
}
void doMonitorExit(MonitorHandle* monitor, ThreadHandle* thread DECAF_UNUSED) {
monitor->count--;
if (monitor->count == 0) {
monitor->owner = NULL;
// Wake any blocked threads so they can attempt to enter the monitor.
PlatformThread::lockMutex(monitor->mutex);
unblockThreads(monitor->blocking);
// since we are signaling waiting threads we unlock this under lock so that they
// don't go back to sleep before we are done
PlatformThread::unlockMutex(monitor->lock);
PlatformThread::unlockMutex(monitor->mutex);
}
}
bool doWaitOnMonitor(MonitorHandle* monitor, ThreadHandle* thread,
long long mills, int nanos, bool interruptible) {
int count = -1;
bool interrupted = false;
bool notified = false;
bool timedOut = false;
if (monitor->owner != thread) {
throw IllegalMonitorStateException(__FILE__, __LINE__, "Current Thread is not the lock holder.");
}
count = monitor->count;
PlatformThread::lockMutex(thread->mutex);
// Before we wait, check if we've already been either interrupted
if (interruptible && thread->interrupted) {
thread->interrupted = false;
PlatformThread::unlockMutex(thread->mutex);
throw InterruptedException(__FILE__, __LINE__, "Thread interrupted");
}
thread->waiting = true;
thread->interruptible = interruptible;
if (mills || nanos) {
thread->timerSet = true;
thread->state = Thread::TIMED_WAITING;
} else {
thread->state = Thread::WAITING;
}
thread->monitor = monitor;
PlatformThread::unlockMutex(thread->mutex);
monitor->owner = NULL;
monitor->count = 0;
PlatformThread::lockMutex(monitor->mutex);
// Release the lock and wake up any blocked threads.
PlatformThread::unlockMutex(monitor->lock);
unblockThreads(monitor->blocking);
// This thread now enters the wait queue.
enqueueThread(&monitor->waiting, thread);
MonitorWaitCompletionCondition completion(thread);
if (mills || nanos) {
timedOut = PlatformThread::interruptibleWaitOnCondition(thread->condition, monitor->mutex, mills, nanos, completion);
} else {
PlatformThread::interruptibleWaitOnCondition(thread->condition, monitor->mutex, completion);
}
dequeueThread(&monitor->waiting, thread);
PlatformThread::unlockMutex(monitor->mutex);
// We should own the Thread's mutex from the CompletionCondition locking it.
interrupted = thread->interrupted;
notified = thread->notified;
thread->waiting = false;
thread->notified = false;
thread->timerSet = false;
thread->interruptible = false;
thread->state = Thread::RUNNABLE;
if (interrupted && !notified) {
thread->interrupted = false;
}
if (thread->interruptingThread) {
PlatformThread::lockMutex(thread->interruptingThread->mutex);
thread->interruptingThread->canceled = true;
PlatformThread::unlockMutex(thread->interruptingThread->mutex);
thread->interruptingThread = NULL;
}
PlatformThread::unlockMutex(thread->mutex);
// Re-acquire the lock now and restore its old state.
doMonitorEnter(monitor, thread);
monitor->count = count;
if (notified) {
return false;
}
if (interrupted) {
throw InterruptedException(__FILE__, __LINE__, "Thread interrupted");
}
if (!timedOut) {
throw RuntimeException(__FILE__, __LINE__, "Invalid state detected at end of Monitor Wait");
}
return true;
}
}
////////////////////////////////////////////////////////////////////////////////
Threading::Threading() {
}
////////////////////////////////////////////////////////////////////////////////
void Threading::initialize() {
library = new ThreadingLibrary();
// Figure out what the OS level thread priority mappings are for the Thread
// classes generic priority value range.
PlatformThread::initPriorityMapping(Thread::MAX_PRIORITY + 1, library->priorityMapping);
PlatformThread::createTlsKey(&(library->threadKey));
PlatformThread::createTlsKey(&(library->selfKey));
PlatformThread::createMutex(&(library->globalLock));
PlatformThread::createMutex(&(library->tlsLock));
library->monitors = new MonitorPool;
library->monitors->head = batchAllocateMonitors();
library->monitors->count = MONITOR_POOL_BLOCK_SIZE;
library->tlsSlots.resize(DECAF_MAX_TLS_SLOTS);
// We mark the thread where Decaf's Init routine is called from as our Main Thread.
library->mainThread = PlatformThread::getCurrentThread();
// Initialize the Executors static data for use in ExecutorService classes and Atomics
Executors::initialize();
Atomics::initialize();
}
////////////////////////////////////////////////////////////////////////////////
void Threading::shutdown() {
// First shutdown the Executors static data to remove dependencies on Threading.
Executors::shutdown();
// Destroy any Foreign Thread Facades that were created during runtime.
std::vector<Thread*>::iterator iter = library->osThreads.begin();
for (; iter != library->osThreads.end(); ++iter) {
delete *iter;
}
library->osThreads.clear();
PlatformThread::destroyTlsKey(library->threadKey);
PlatformThread::destroyTlsKey(library->selfKey);
PlatformThread::destroyMutex(library->globalLock);
PlatformThread::destroyMutex(library->tlsLock);
purgeMonitorsPool(library->monitors);
delete library->monitors;
delete library;
// Atomics only uses platform Thread primitives when there are no atomic
// builtins and Atomics are used in thread so make sure this is always last
// in the shutdown order.
Atomics::shutdown();
}
////////////////////////////////////////////////////////////////////////////////
void Threading::lockThreadsLib() {
PlatformThread::lockMutex(library->globalLock);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::unlockThreadsLib() {
PlatformThread::unlockMutex(library->globalLock);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::dumpRunningThreads() {
lockThreadsLib();
if (library == NULL) {
return;
}
std::list<ThreadHandle*>::const_iterator threads = library->activeThreads.begin();
std::cout << "------------------------------------------------------------------------" << std::endl;
std::cout << " Active Threads: " << library->activeThreads.size() << std::endl;
std::cout << " Wrapped OS Threads: " << library->osThreads.size() << std::endl;
std::cout << std::endl;
for(; threads != library->activeThreads.end(); ++threads) {
ThreadHandle* thread = *threads;
if (thread == NULL) {
continue;
}
std::string threadName = thread->name;
std::cout << "Thread name = " << threadName << std::endl;
}
std::cout << "------------------------------------------------------------------------" << std::endl;
unlockThreadsLib();
}
////////////////////////////////////////////////////////////////////////////////
ThreadHandle* Threading::createNewThread(Thread* parent, const char* name, long long stackSize) {
if (parent == NULL || name == NULL) {
throw NullPointerException(__FILE__, __LINE__, "One or more arguments was NULL");
}
Pointer<ThreadHandle> thread(new ThreadHandle());
initThreadHandle(thread.get());
thread->parent = parent;
thread->name = ::strdup(name);
createThreadInstance(thread.get(), stackSize, Thread::NORM_PRIORITY, true, runCallback, thread.get());
return thread.release();
}
////////////////////////////////////////////////////////////////////////////////
void Threading::destroyThread(ThreadHandle* thread) {
if (!thread->osThread) {
// If the thread was created but never started then we need to wake it
// up from the suspended state so that it can terminate, we mark it
// canceled to ensure it doesn't call its runnable.
if (thread->state == Thread::NEW) {
PlatformThread::lockMutex(thread->mutex);
if (thread->state == Thread::NEW && thread->suspended == true) {
thread->canceled = true;
thread->suspended = false;
PlatformThread::notifyAll(thread->condition);
}
PlatformThread::unlockMutex(thread->mutex);
}
try {
Threading::join(thread, 0, 0);
} catch (InterruptedException& ex) {}
} else {
PlatformThread::detachOSThread(thread->handle);
}
dereferenceThread(thread);
}
////////////////////////////////////////////////////////////////////////////////
ThreadHandle* Threading::attachToCurrentThread() {
Pointer<ThreadHandle> thread(initThreadHandle(new ThreadHandle()));
thread->handle = PlatformThread::getCurrentThread();
thread->state = Thread::RUNNABLE;
thread->stackSize = PlatformThread::getStackSize(thread->handle);
thread->name = ::strdup(
std::string(std::string("OS-Thread") + Integer::toString(library->osThreadId.getAndIncrement())).c_str());
thread->threadId = PlatformThread::getCurrentThreadId();
// An OS Thread doesn't have a running thread, this is only a proxy to only one ref.
thread->references = 1;
// Now create a Decaf Thread as a proxy to the OS thread.
Pointer<Thread> osThread(new Thread(thread.get()));
thread->parent = osThread.get();
thread->osThread = true;
PlatformThread::setTlsValue(library->threadKey, osThread.get());
PlatformThread::setTlsValue(library->selfKey, thread.get());
// Store the Thread that wraps this OS thread for later deletion since
// no other owners exist.
PlatformThread::lockMutex(library->globalLock);
library->osThreads.push_back(osThread.release());
PlatformThread::unlockMutex(library->globalLock);
return thread.release();
}
////////////////////////////////////////////////////////////////////////////////
void Threading::start(ThreadHandle* thread) {
try {
if (thread->state > Thread::NEW) {
throw IllegalThreadStateException(__FILE__, __LINE__, "Thread already started");
}
PlatformThread::lockMutex(thread->mutex);
thread->state = Thread::RUNNABLE;
if (thread->suspended == true) {
thread->suspended = false;
PlatformThread::notifyAll(thread->condition);
}
PlatformThread::unlockMutex(thread->mutex);
}
DECAF_CATCH_RETHROW(IllegalThreadStateException)
DECAF_CATCH_RETHROW(RuntimeException)
DECAF_CATCH_EXCEPTION_CONVERT(NullPointerException, RuntimeException)
DECAF_CATCHALL_THROW(RuntimeException)
}
////////////////////////////////////////////////////////////////////////////////
void Threading::yeild() {
PlatformThread::yeild();
}
////////////////////////////////////////////////////////////////////////////////
void Threading::interrupt(ThreadHandle* thread) {
ThreadHandle* self = Threading::getCurrentThreadHandle();
PlatformThread::lockMutex(library->globalLock);
PlatformThread::lockMutex(thread->mutex);
if (thread->interrupted == true) {
PlatformThread::unlockMutex(thread->mutex);
PlatformThread::unlockMutex(library->globalLock);
return;
}
if (thread->interruptible == true) {
if (thread->sleeping || thread->parked) {
PlatformThread::notifyAll(thread->condition);
} else if(thread->waiting == true) {
if (interruptWaitingThread(self, thread)) {
thread->blocked = true;
}
}
}
thread->interrupted = true;
PlatformThread::unlockMutex(thread->mutex);
PlatformThread::unlockMutex(library->globalLock);
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::interrupted() {
ThreadHandle* self = Threading::getCurrentThreadHandle();
return Threading::isInterrupted(self, true);
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::isInterrupted(ThreadHandle* handle, bool reset) {
bool currentValue = handle->interrupted;
if (reset == true) {
PlatformThread::lockMutex(handle->mutex);
currentValue = handle->interrupted;
handle->interrupted = false;
PlatformThread::unlockMutex(handle->mutex);
}
return currentValue;
}
////////////////////////////////////////////////////////////////////////////////
namespace {
class JoinCompletionCondition : public CompletionCondition {
private:
ThreadHandle* self;
ThreadHandle* target;
private:
JoinCompletionCondition(const JoinCompletionCondition&);
JoinCompletionCondition& operator= (const JoinCompletionCondition&);
public:
JoinCompletionCondition(ThreadHandle* self, ThreadHandle* target) : self(self), target(target) {}
virtual bool operator()() {
if (target != NULL) {
if (target->state == Thread::TERMINATED) {
return true;
}
PlatformThread::lockMutex(self->mutex);
if (self->interrupted == true) {
PlatformThread::unlockMutex(self->mutex);
return true;
}
PlatformThread::unlockMutex(self->mutex);
} else if (self->interrupted == true) {
return true;
}
return false;
}
using CompletionCondition::operator();
};
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::join(ThreadHandle* thread, long long mills, int nanos) {
if ((mills < 0) || (nanos < 0) || (nanos >= 1000000)) {
throw IllegalArgumentException(__FILE__, __LINE__, "Timeout arguments out of range.");
}
bool timedOut = false;
bool interrupted = false;
ThreadHandle* self = getCurrentThreadHandle();
PlatformThread::lockMutex(self->mutex);
if (self->interrupted == true) {
interrupted = true;
} else if (self == thread && self->state != Thread::TERMINATED) {
// When blocking on ourself, we just enter a wait and hope there's
// either a timeout, or we are interrupted.
JoinCompletionCondition completion(self, NULL);
self->sleeping = true;
self->interruptible = true;
self->state = Thread::SLEEPING;
if (mills > 0 || nanos > 0) {
self->timerSet = true;
timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex,
mills, nanos, completion);
} else {
PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex, completion);
}
} else {
PlatformThread::lockMutex(thread->mutex);
if (thread->state >= Thread::RUNNABLE && thread->state != Thread::TERMINATED) {
enqueueThread(&thread->joiners, self);
self->sleeping = true;
self->interruptible = true;
self->state = Thread::SLEEPING;
JoinCompletionCondition completion(self, thread);
if (mills > 0 || nanos > 0) {
self->timerSet = true;
PlatformThread::unlockMutex(self->mutex);
timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, thread->mutex,
mills, nanos, completion);
} else {
PlatformThread::unlockMutex(self->mutex);
PlatformThread::interruptibleWaitOnCondition(self->condition, thread->mutex, completion);
}
dequeueThread(&thread->joiners, self);
PlatformThread::unlockMutex(thread->mutex);
PlatformThread::lockMutex(self->mutex);
self->timerSet = false;
self->state = Thread::RUNNABLE;
self->sleeping = false;
self->interruptible = false;
if (self->interrupted == true) {
interrupted = true;
self->interrupted = false;
}
} else {
PlatformThread::unlockMutex(thread->mutex);
}
}
PlatformThread::unlockMutex(self->mutex);
if (interrupted) {
throw InterruptedException(__FILE__, __LINE__, "Sleeping Thread interrupted");
}
return timedOut;
}
////////////////////////////////////////////////////////////////////////////////
namespace {
class SleepCompletionCondition : public CompletionCondition {
private:
ThreadHandle* handle;
private:
SleepCompletionCondition(const SleepCompletionCondition&);
SleepCompletionCondition& operator= (const SleepCompletionCondition&);
public:
SleepCompletionCondition(ThreadHandle* handle) : handle(handle) {}
bool operator()() {
if (handle->interrupted) {
return true;
}
return false;
}
using CompletionCondition::operator();
};
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::sleep(long long mills, int nanos) {
if ((mills < 0) || (nanos < 0) || (nanos >= 1000000)) {
throw IllegalArgumentException(__FILE__, __LINE__, "Timeout arguments out of range.");
}
bool timedOut = false;
bool interrupted = false;
ThreadHandle* self = getCurrentThreadHandle();
PlatformThread::lockMutex(self->mutex);
if (self->interrupted == true) {
interrupted = true;
} else {
self->sleeping = true;
self->state = Thread::SLEEPING;
self->interruptible = true;
self->timerSet = true;
SleepCompletionCondition completion(self);
timedOut = PlatformThread::interruptibleWaitOnCondition(self->condition, self->mutex,
mills, nanos, completion);
}
self->timerSet = false;
self->sleeping = false;
self->interruptible = false;
self->state = Thread::RUNNABLE;
if (self->interrupted == true) {
interrupted = true;
self->interrupted = false;
}
PlatformThread::unlockMutex(self->mutex);
if (interrupted) {
throw InterruptedException(__FILE__, __LINE__, "Sleeping Thread interrupted");
}
return timedOut;
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::isThreadAlive(ThreadHandle* handle DECAF_UNUSED) {
return handle->state >= Thread::RUNNABLE && handle->state != Thread::TERMINATED;
}
////////////////////////////////////////////////////////////////////////////////
long long Threading::getThreadId(ThreadHandle* handle) {
return handle->threadId;
}
////////////////////////////////////////////////////////////////////////////////
int Threading::getThreadPriority(ThreadHandle* handle) {
return handle->priority;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::setThreadPriority(ThreadHandle* handle, int priority) {
PlatformThread::setPriority(handle->handle, library->priorityMapping[priority]);
handle->priority = priority;
}
////////////////////////////////////////////////////////////////////////////////
const char* Threading::getThreadName(ThreadHandle* handle) {
return handle->name;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::setThreadName(ThreadHandle* thread, const char* name) {
if (thread->name != NULL) {
free(thread->name);
}
thread->name = ::strdup(name);
}
////////////////////////////////////////////////////////////////////////////////
Thread::State Threading::getThreadState(ThreadHandle* handle) {
return (Thread::State)handle->state;
}
////////////////////////////////////////////////////////////////////////////////
Thread* Threading::getCurrentThread() {
return getCurrentThreadHandle()->parent;
}
////////////////////////////////////////////////////////////////////////////////
ThreadHandle* Threading::getCurrentThreadHandle() {
ThreadHandle* self = (ThreadHandle*)PlatformThread::getTlsValue(library->selfKey);
if (self == NULL) {
self = attachToCurrentThread();
}
return self;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::park(Thread* thread) {
if (thread == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
Threading::park(thread, 0LL, 0LL);
}
////////////////////////////////////////////////////////////////////////////////
namespace {
class ParkCompletionCondition : public CompletionCondition {
private:
ThreadHandle* handle;
private:
ParkCompletionCondition(const ParkCompletionCondition&);
ParkCompletionCondition& operator= (const ParkCompletionCondition&);
public:
ParkCompletionCondition(ThreadHandle* handle) : handle(handle) {}
virtual bool operator()() {
if (handle->unparked == true) {
handle->unparked = false;
return true;
} else if (handle->interrupted) {
return true;
}
return false;
}
using CompletionCondition::operator();
};
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::park( Thread* thread, long long mills, int nanos) {
if (thread == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
bool timedOut = false;
bool interrupted = false;
ThreadHandle* handle = thread->getHandle();
PlatformThread::lockMutex(handle->mutex);
if (handle->unparked == true) {
handle->unparked = false;
} else if (handle->interrupted == true) {
interrupted = true;
} else {
handle->state = Thread::BLOCKED;
handle->parked = true;
handle->interruptible = true;
ParkCompletionCondition completion(handle);
if (mills > 0 || nanos > 0) {
handle->timerSet = true;
timedOut = PlatformThread::interruptibleWaitOnCondition(handle->condition, handle->mutex,
mills, nanos, completion);
} else {
PlatformThread::interruptibleWaitOnCondition(handle->condition, handle->mutex, completion);
}
}
if (interrupted) {
// TODO
}
handle->timerSet = false;
handle->parked = false;
handle->interruptible = false;
handle->state = Thread::RUNNABLE;
PlatformThread::unlockMutex(handle->mutex);
return timedOut;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::unpark(Thread* thread) {
if (thread == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Null Thread Pointer Passed.");
}
ThreadHandle* handle = thread->getHandle();
PlatformThread::lockMutex(handle->mutex);
// Set the un-parked token, if the thread is parked it will consume
// it when it resumes, otherwise the next call to park will consume
// it without needing to actually wait.
handle->unparked = true;
// If the thread is actually parked then we send it a signal so
// that it will resume.
if (handle->parked) {
PlatformThread::notifyAll(handle->condition);
}
PlatformThread::unlockMutex(handle->mutex);
}
////////////////////////////////////////////////////////////////////////////////
MonitorHandle* Threading::takeMonitor(bool alreadyLocked) {
MonitorHandle* monitor = NULL;
if (!alreadyLocked) {
PlatformThread::lockMutex(library->globalLock);
}
if (library->monitors->head == NULL) {
library->monitors->head = batchAllocateMonitors();
library->monitors->count = MONITOR_POOL_BLOCK_SIZE;
}
monitor = library->monitors->head;
library->monitors->head = monitor->next;
library->monitors->count--;
monitor->next = NULL;
if (monitor->initialized == false) {
PlatformThread::createMutex(&monitor->mutex);
PlatformThread::createMutex(&monitor->lock);
monitor->initialized = true;
}
if (!alreadyLocked) {
PlatformThread::unlockMutex(library->globalLock);
}
return monitor;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::returnMonitor(MonitorHandle* monitor, bool alreadyLocked) {
if (monitor == NULL) {
throw RuntimeException(__FILE__, __LINE__, "Monitor pointer was null");
}
// The own can return the Monitor in a locked state if its held by the thread that's
// returning it, we will unlock it for the caller in this case, otherwise we throw
// an error, and in every case we need to thrown if someone's still waiting on the
// target monitor otherwise we could see a segfault.
if ((monitor->owner && monitor->owner != getCurrentThreadHandle()) || monitor->waiting) {
throw IllegalMonitorStateException(__FILE__, __LINE__, "Monitor is still in use!");
}
if (monitor->owner) {
Threading::exitMonitor(monitor);
}
if (!alreadyLocked) {
PlatformThread::lockMutex(library->globalLock);
}
initMonitorHandle(monitor);
monitor->next = library->monitors->head;
library->monitors->head = monitor;
library->monitors->count++;
if (!alreadyLocked) {
PlatformThread::unlockMutex(library->globalLock);
}
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::isMonitorLocked(MonitorHandle* monitor) {
if (monitor != NULL) {
return monitor->owner != NULL;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::enterMonitor(MonitorHandle* monitor) {
ThreadHandle* thisThread = getCurrentThreadHandle();
if (thisThread == monitor->owner) {
monitor->count++;
return;
}
doMonitorEnter(monitor, thisThread);
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::tryEnterMonitor(MonitorHandle* monitor) {
ThreadHandle* self = getCurrentThreadHandle();
return monitorTryEnterUsingThreadId(monitor, self);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::exitMonitor(MonitorHandle* monitor) {
ThreadHandle* thisThread = getCurrentThreadHandle();
if (thisThread != monitor->owner) {
throw IllegalMonitorStateException(__FILE__, __LINE__, "Thread is not the owner of this monitor");
}
doMonitorExit(monitor, thisThread);
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::waitOnMonitor(MonitorHandle* monitor, long long mills, int nanos) {
ThreadHandle* self = getCurrentThreadHandle();
// Wait but do so in a non-interruptible state.
return doWaitOnMonitor(monitor, self, mills, nanos, true);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::notifyWaiter(MonitorHandle* monitor) {
doNotifyWaiters(monitor, false);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::notifyAllWaiters(MonitorHandle* monitor) {
doNotifyWaiters(monitor, true);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::monitorExitUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
if (monitor->owner != thread) {
throw IllegalMonitorStateException(__FILE__, __LINE__, "Specified thread is not the monitor owner.");
}
doMonitorExit(monitor, thread);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::monitorEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
if (monitor->owner == thread) {
monitor->count++;
return;
}
doMonitorEnter(monitor, thread);
}
////////////////////////////////////////////////////////////////////////////////
bool Threading::monitorTryEnterUsingThreadId(MonitorHandle* monitor, ThreadHandle* thread) {
if (monitor->owner == thread) {
monitor->count++;
return true;
}
if (PlatformThread::tryLockMutex(monitor->lock) == true) {
monitor->owner = thread;
monitor->count = 1;
return true;
}
return false;
}
////////////////////////////////////////////////////////////////////////////////
int Threading::createThreadLocalSlot(ThreadLocalImpl* threadLocal) {
if (threadLocal == NULL) {
throw NullPointerException(__FILE__, __LINE__, "Null ThreadLocalImpl Pointer Passed." );
}
int index;
PlatformThread::lockMutex(library->tlsLock);
for (index = 0; index < DECAF_MAX_TLS_SLOTS; index++) {
if (library->tlsSlots[index] == NULL) {
library->tlsSlots[index] = threadLocal;
break;
}
}
PlatformThread::unlockMutex(library->tlsLock);
return index < DECAF_MAX_TLS_SLOTS ? index : -1;
}
////////////////////////////////////////////////////////////////////////////////
void* Threading::getThreadLocalValue(int slot) {
ThreadHandle* thisThread = getCurrentThreadHandle();
return thisThread->tls[slot];
}
////////////////////////////////////////////////////////////////////////////////
void Threading::setThreadLocalValue(int slot, void* value) {
ThreadHandle* thisThread = getCurrentThreadHandle();
thisThread->tls[slot] = value;
}
////////////////////////////////////////////////////////////////////////////////
void Threading::destoryThreadLocalSlot(int slot) {
ThreadHandle* current = NULL;
ThreadLocalImpl* local = library->tlsSlots[slot];
// Must lock globally so that no thread can terminate and call its own
// tls cleanup and our list of thread remains stable.
PlatformThread::lockMutex(library->globalLock);
std::list<ThreadHandle*>::const_iterator iter = library->activeThreads.begin();
while (iter != library->activeThreads.end()) {
current = *(iter++);
void* value = current->tls[slot];
if (value != NULL) {
local->doDelete(value);
current->tls[slot] = NULL;
}
}
std::vector<decaf::lang::Thread*>::const_iterator osIter = library->osThreads.begin();
while (osIter != library->osThreads.end()) {
current = (*(osIter++))->getHandle();
void* value = current->tls[slot];
if (value != NULL) {
local->doDelete(value);
current->tls[slot] = NULL;
}
}
PlatformThread::unlockMutex(library->globalLock);
// Return the slot to the pool under lock so that a taker waits.
PlatformThread::lockMutex(library->tlsLock);
library->tlsSlots[slot] = NULL;
PlatformThread::unlockMutex(library->tlsLock);
}
////////////////////////////////////////////////////////////////////////////////
void Threading::releaseCurrentThreadHandle() {
ThreadHandle* self = (ThreadHandle*)PlatformThread::getTlsValue(library->selfKey);
if (self != NULL) {
detachFromCurrentThread(self);
}
}
////////////////////////////////////////////////////////////////////////////////
void Threading::detachFromCurrentThread(ThreadHandle* self) {
PlatformThread::lockMutex(library->globalLock);
// Destroy given Foreign Thread Facade that was created during runtime.
std::vector<Thread*>::iterator iter = library->osThreads.begin();
bool isFound = false;
for (; iter != library->osThreads.end(); ++iter) {
if (self->parent == *iter) {
isFound = true;
break;
}
}
if (isFound) {
PlatformThread::setTlsValue(library->threadKey, NULL);
PlatformThread::setTlsValue(library->selfKey, NULL);
// Ensure all of this thread's local values are purged.
threadExitTlsCleanup(self);
// Destroy OS thread including self thread handle.
delete *iter;
// Remove thread form the global list.
library->osThreads.erase(iter);
}
PlatformThread::unlockMutex(library->globalLock);
}