blob: 6c24d824985fc1d4fb1b667bc445bfabf5afeec5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <thrift/thrift-config.h>
#include <thrift/concurrency/Monitor.h>
#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Util.h>
#include <thrift/transport/PlatformSocket.h>
#include <assert.h>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
namespace apache {
namespace thrift {
namespace concurrency {
/**
* Monitor implementation using the boost thread library
*
* @version $Id:$
*/
class Monitor::Impl : public boost::condition_variable_any {
public:
Impl() : ownedMutex_(new Mutex()), mutex_(NULL) { init(ownedMutex_.get()); }
Impl(Mutex* mutex) : mutex_(NULL) { init(mutex); }
Impl(Monitor* monitor) : mutex_(NULL) { init(&(monitor->mutex())); }
Mutex& mutex() { return *mutex_; }
void lock() { mutex().lock(); }
void unlock() { mutex().unlock(); }
/**
* Exception-throwing version of waitForTimeRelative(), called simply
* wait(int64) for historical reasons. Timeout is in milliseconds.
*
* If the condition occurs, this function returns cleanly; on timeout or
* error an exception is thrown.
*/
void wait(int64_t timeout_ms) {
int result = waitForTimeRelative(timeout_ms);
if (result == THRIFT_ETIMEDOUT) {
throw TimedOutException();
} else if (result != 0) {
throw TException("Monitor::wait() failed");
}
}
/**
* Waits until the specified timeout in milliseconds for the condition to
* occur, or waits forever if timeout_ms == 0.
*
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTimeRelative(int64_t timeout_ms) {
if (timeout_ms == 0LL) {
return waitForever();
}
assert(mutex_);
boost::timed_mutex* mutexImpl
= reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
int res
= timed_wait(lock, boost::get_system_time() + boost::posix_time::milliseconds(timeout_ms))
? 0
: THRIFT_ETIMEDOUT;
lock.release();
return res;
}
/**
* Waits until the absolute time specified using struct THRIFT_TIMESPEC.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const THRIFT_TIMESPEC* abstime) {
struct timeval temp;
temp.tv_sec = static_cast<long>(abstime->tv_sec);
temp.tv_usec = static_cast<long>(abstime->tv_nsec) / 1000;
return waitForTime(&temp);
}
/**
* Waits until the absolute time specified using struct timeval.
* Returns 0 if condition occurs, THRIFT_ETIMEDOUT on timeout, or an error code.
*/
int waitForTime(const struct timeval* abstime) {
assert(mutex_);
boost::timed_mutex* mutexImpl = static_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
struct timeval currenttime;
Util::toTimeval(currenttime, Util::currentTime());
long tv_sec = static_cast<long>(abstime->tv_sec - currenttime.tv_sec);
long tv_usec = static_cast<long>(abstime->tv_usec - currenttime.tv_usec);
if (tv_sec < 0)
tv_sec = 0;
if (tv_usec < 0)
tv_usec = 0;
boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
int res = timed_wait(lock,
boost::get_system_time() + boost::posix_time::seconds(tv_sec)
+ boost::posix_time::microseconds(tv_usec))
? 0
: THRIFT_ETIMEDOUT;
lock.release();
return res;
}
/**
* Waits forever until the condition occurs.
* Returns 0 if condition occurs, or an error code otherwise.
*/
int waitForever() {
assert(mutex_);
boost::timed_mutex* mutexImpl
= reinterpret_cast<boost::timed_mutex*>(mutex_->getUnderlyingImpl());
assert(mutexImpl);
boost::timed_mutex::scoped_lock lock(*mutexImpl, boost::adopt_lock);
((boost::condition_variable_any*)this)->wait(lock);
lock.release();
return 0;
}
void notify() { notify_one(); }
void notifyAll() { notify_all(); }
private:
void init(Mutex* mutex) { mutex_ = mutex; }
boost::scoped_ptr<Mutex> ownedMutex_;
Mutex* mutex_;
};
Monitor::Monitor() : impl_(new Monitor::Impl()) {
}
Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {
}
Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {
}
Monitor::~Monitor() {
delete impl_;
}
Mutex& Monitor::mutex() const {
return const_cast<Monitor::Impl*>(impl_)->mutex();
}
void Monitor::lock() const {
const_cast<Monitor::Impl*>(impl_)->lock();
}
void Monitor::unlock() const {
const_cast<Monitor::Impl*>(impl_)->unlock();
}
void Monitor::wait(int64_t timeout) const {
const_cast<Monitor::Impl*>(impl_)->wait(timeout);
}
int Monitor::waitForTime(const THRIFT_TIMESPEC* abstime) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTime(const timeval* abstime) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTime(abstime);
}
int Monitor::waitForTimeRelative(int64_t timeout_ms) const {
return const_cast<Monitor::Impl*>(impl_)->waitForTimeRelative(timeout_ms);
}
int Monitor::waitForever() const {
return const_cast<Monitor::Impl*>(impl_)->waitForever();
}
void Monitor::notify() const {
const_cast<Monitor::Impl*>(impl_)->notify();
}
void Monitor::notifyAll() const {
const_cast<Monitor::Impl*>(impl_)->notifyAll();
}
}
}
} // apache::thrift::concurrency