blob: ee8ed429f040a57634bce3ac7c5a738c1707b90c [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#if !defined (IMPL_INTQUEUE_INCLUDED)
#define IMPL_INTQUEUE_INCLUDED
#include <deque>
#include <ace/ACE.h>
#include <ace/Condition_Recursive_Thread_Mutex.h>
#include <ace/Time_Value.h>
#include <ace/Guard_T.h>
#include <ace/Recursive_Thread_Mutex.h>
namespace gemfire
{
template
< class T >
class CPPCACHE_EXPORT IntQueue
{
public:
IntQueue( )
: m_cond( m_mutex )
{
}
~IntQueue()
{
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
while ( m_queue.size() > 0 ) {
m_queue.pop_back();
}
}
/** wait usec time until notified */
T get( long usec )
{
ACE_Time_Value interval( usec / 1000000, usec % 1000000 );
return getUntil( interval );
}
T get()
{
T mp = 0;
getInternal( mp );
return mp;
}
void put( T mp )
{
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
m_queue.push_front( mp );
m_cond.signal();
}
uint32_t size()
{
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
return static_cast<uint32_t> (m_queue.size());
}
void clear()
{
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
m_queue.clear();
m_cond.signal();
}
bool empty()
{
return size() == 0;
}
private:
inline bool getInternal( T& val )
{
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
if ( m_queue.size() > 0 ) {
val = m_queue.back();
m_queue.pop_back();
return true;
}
return false;
}
T getUntil( const ACE_Time_Value& interval )
{
T mp = 0;
bool found = getInternal( mp );
if ( !found ) {
ACE_Time_Value stopAt( ACE_OS::gettimeofday( ) );
stopAt += interval;
while ( !found && ACE_OS::gettimeofday() < stopAt) {
ACE_Guard < ACE_Recursive_Thread_Mutex > _guard( m_mutex );
m_cond.wait( &stopAt );
if ( m_queue.size() > 0 ) {
mp = m_queue.back();
m_queue.pop_back();
found = true;
}
}
}
return mp;
}
typedef std::deque < T > LocalQueue;
LocalQueue m_queue;
ACE_Recursive_Thread_Mutex m_mutex;
ACE_Condition < ACE_Recursive_Thread_Mutex > m_cond;
};
} // end namespace
#endif // !defined (IMPL_INTQUEUE_INCLUDED)