blob: 9b7c0d7807d2992c69f8a7932c69b9ab614faf9b [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#ifndef _GEMFIRE_FAIRQUEUE_SOL1_HPP_
#define _GEMFIRE_FAIRQUEUE_SOL1_HPP_
#include <deque>
#include <ace/ACE.h>
#include <ace/Thread_Mutex.h>
#include <ace/Token.h>
#include <ace/Condition_T.h>
#include <ace/Time_Value.h>
#include <ace/Guard_T.h>
namespace gemfire
{
template < class T > class FairQueue
{
public:
FairQueue( ) : m_cond( m_queueLock ), m_closed( false )
{
}
~FairQueue( )
{
close( );
}
/** wait sec time until notified */
T* getUntil( long sec )
{
bool isClosed;
T* mp = getNoGetLock( isClosed );
if ( mp == NULL && !isClosed ) {
ACE_Guard < ACE_Token > _guard( m_queueGetLock );
ACE_Time_Value stopAt( ACE_OS::gettimeofday( ) );
stopAt += sec;
ACE_Guard< ACE_Thread_Mutex > _guard1( m_queueLock );
if ( !m_closed ) {
do {
m_cond.wait( &stopAt );
mp = popFromQueue( isClosed );
} while ( mp == NULL && !isClosed &&
( ACE_OS::gettimeofday( ) < stopAt ) );
}
}
return mp;
}
void put( T* mp )
{
GF_DEV_ASSERT( mp != 0 );
ACE_Guard< ACE_Thread_Mutex > _guard( m_queueLock );
if ( !m_closed ) {
m_queue.push_front( mp );
m_cond.signal( );
}
}
uint32_t size( )
{
ACE_Guard< ACE_Thread_Mutex > _guard( m_queueLock );
return static_cast<uint32_t> (m_queue.size( ));
}
bool empty( )
{
return ( size( ) == 0 );
}
void close( )
{
ACE_Guard< ACE_Thread_Mutex > _guard( m_queueLock );
m_closed = true;
while ( !m_queue.empty() ) {
T* mp = m_queue.back( );
m_queue.pop_back( );
delete mp;
}
m_cond.signal( );
}
void reset( )
{
ACE_Guard< ACE_Thread_Mutex > _guard( m_queueLock );
m_closed = false;
}
private:
inline T* getNoGetLock( bool& isClosed )
{
ACE_Guard< ACE_Thread_Mutex > _guard( m_queueLock );
return popFromQueue( isClosed );
}
inline T* popFromQueue( bool& isClosed )
{
T* mp = NULL;
isClosed = m_closed;
if ( !isClosed && m_queue.size( ) > 0 ) {
mp = m_queue.back( );
m_queue.pop_back( );
}
return mp;
}
std::deque < T* > m_queue;
ACE_Thread_Mutex m_queueLock;
ACE_Condition < ACE_Thread_Mutex > m_cond;
ACE_Token m_queueGetLock;
bool m_closed;
};
} // end namespace
#endif // ifndef _GEMFIRE_FAIRQUEUE_SOL1_HPP_