blob: 31c6a041085d8decb870183c3c59a403217013cc [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#ifndef __TCR_ENDPOINT_HPP__
#define __TCR_ENDPOINT_HPP__
#include "../gfcpp_globals.hpp"
#include <string>
#include <list>
#include <ace/Recursive_Thread_Mutex.h>
#include <ace/Semaphore.h>
#include "../gf_base.hpp"
#include "FairQueue.hpp"
#include "Set.hpp"
#include "TcrConnection.hpp"
#include "GF_TASK_T.hpp"
#include "SpinLock.hpp"
namespace gemfire
{
class ThinClientRegion;
class TcrMessage;
class ThinClientBaseDM;
class CacheImpl;
class ThinClientPoolHADM;
class ThinClientPoolDM;
class CPPCACHE_EXPORT TcrEndpoint
{
public:
TcrEndpoint( const std::string& name, CacheImpl* cache,
ACE_Semaphore& failoverSema, ACE_Semaphore& cleanupSema,
ACE_Semaphore& redundancySema, ThinClientBaseDM* dm = NULL, bool isMultiUserMode = false);//TODO:hitesh need to look for endpoint case
/* adongre
* CID 29000: Non-virtual destructor (VIRTUAL_DTOR)
*/
virtual ~TcrEndpoint( );
virtual GfErrType registerDM( bool clientNotification, bool isSecondary = false, bool isActiveEndpoint = false, ThinClientBaseDM * distMgr = NULL );
//GfErrType registerPoolDM( bool isSecondary, ThinClientPoolHADM* poolDM );
virtual void unregisterDM( bool clientNotification, ThinClientBaseDM * distMgr = NULL, bool checkQueueHosted = false );
//void unregisterPoolDM( );
void pingServer(ThinClientPoolDM* poolDM = NULL);
int receiveNotification(volatile bool& isRunning);
GfErrType send( const TcrMessage &request, TcrMessage & reply );
GfErrType sendRequestConn( const TcrMessage &request, TcrMessage & reply, TcrConnection* conn, std::string& failReason );
GfErrType sendRequestWithRetry( const TcrMessage &request, TcrMessage & reply, TcrConnection*& conn,
bool& epFailure, std::string& failReason, int maxSendRetries, bool useEPPool, int64_t requestedTimeout, bool isBgThread = false );
GfErrType sendRequestConnWithRetry( const TcrMessage &request, TcrMessage & reply, TcrConnection*& conn, bool isBgThread = false );
void stopNotifyReceiverAndCleanup();
void stopNoBlock();
bool inline connected() const
{
return m_connected;
}
int inline numRegions() const
{
return m_numRegions;
}
void inline setNumRegions(int numRegions)
{
m_numRegions = numRegions;
}
inline const std::string& name() const
{
return m_name;
}
// ARB: setConnectionStatus is now a public method, as it is used by TcrDistributionManager.
void setConnectionStatus(bool status);
inline const int getNumRegionListeners() const
{
return m_numRegionListener;
}
//TODO:hitesh for single user mode only
void setUniqueId(int64_t uniqueId)
{
LOGDEBUG("tcrEndpoint:setUniqueId:: %d ",uniqueId);
m_isAuthenticated = true;
m_uniqueId = uniqueId;
}
int64_t getUniqueId()
{
LOGDEBUG("tcrEndpoint:getUniqueId:: %d ",m_uniqueId);
return m_uniqueId;
}
bool isAuthenticated()
{
return m_isAuthenticated;
}
void setAuthenticated(bool flag)
{
m_isAuthenticated = false;
}
virtual bool isMultiUserMode();
/*{
if(m_baseDM != NULL)
return this->m_baseDM->isMultiUserMode();
else
return false;
}*/
void authenticateEndpoint(TcrConnection *& conn);
ServerQueueStatus getFreshServerQueueStatus( int32_t& queueSize, bool addToQueue, TcrConnection*& statusConn );
// ARB: TESTING: return true or false
bool inline getServerQueueStatusTEST() {
return (m_serverQueueStatus == REDUNDANT_SERVER || m_serverQueueStatus == PRIMARY_SERVER);
}
// Get cached server queue props.
int32_t inline getServerQueueSize() { return m_queueSize; }
ServerQueueStatus getServerQueueStatus(){return m_serverQueueStatus; }
//Set server queue props.
void setServerQueueStatus( ServerQueueStatus queueStatus ,int32_t queueSize );
GfErrType createNewConnection( TcrConnection*& newConn,
bool isClientNotification = false, bool isSecondary = false,
uint32_t connectTimeout = DEFAULT_CONNECT_TIMEOUT,
int32_t timeoutRetries = 1, bool sendUpdateNotification = true, bool appThreadRequest = false);
bool needtoTakeConnectLock();
volatile bool m_needToConnectInLock;
ACE_Recursive_Thread_Mutex m_connectLock;
ACE_Condition <ACE_Recursive_Thread_Mutex> m_connectLockCond;
GfErrType createNewConnectionWL(TcrConnection*& newConn, bool isClientNotification, bool isSecondary, uint32_t connectTimeout);
void setConnected(volatile bool connected = true ) { m_connected = connected; }
virtual ThinClientPoolDM* getPoolHADM( ) { return NULL; }
bool isQueueHosted();
ACE_Recursive_Thread_Mutex& getQueueHostedMutex( ) { return m_notifyReceiverLock; }
/*
void sendNotificationCloseMsg();
*/
void setDM(ThinClientBaseDM* dm)
{
LOGDEBUG("tcrendpoint setDM");
this->m_baseDM = dm;
}
int32_t numberOfTimesFailed()
{
return m_numberOfTimesFailed;
}
void addConnRefCounter(int count)
{
HostAsm::atomicAdd( m_noOfConnRefs, count );
}
int getConnRefCounter()
{
return m_noOfConnRefs;
}
virtual uint16_t getDistributedMemberID()
{
return m_distributedMemId;
}
virtual void setDistributedMemberID(uint16_t memId)
{
m_distributedMemId = memId;
}
protected:
PropertiesPtr getCredentials();
volatile int m_maxConnections;
FairQueue<TcrConnection> m_opConnections;
virtual bool checkDupAndAdd( EventIdPtr eventid );
virtual void processMarker( );
virtual void triggerRedundancyThread();
virtual QueryServicePtr getQueryService( );
virtual void sendRequestForChunkedResponse(const TcrMessage& request, TcrMessage& reply, TcrConnection* conn);
virtual void closeFailedConnection( TcrConnection*& conn );
void closeConnection( TcrConnection*& conn );
virtual void handleNotificationStats(int64 byteLength){};
virtual void closeNotification( );
std::list<GF_TASK_T<TcrEndpoint> *> m_notifyReceiverList;
std::list<TcrConnection *> m_notifyConnectionList;
TcrConnection * m_notifyConnection;
GF_TASK_T<TcrEndpoint> * m_notifyReceiver;
int m_numRegionListener;
bool m_isQueueHosted;
ACE_Recursive_Thread_Mutex m_notifyReceiverLock;
virtual bool handleIOException(const std::string& message, TcrConnection*& conn, bool isBgThread = false);
private:
int64_t m_uniqueId;
bool m_isAuthenticated;
ACE_Recursive_Thread_Mutex m_endpointAuthenticationLock;
volatile bool m_msgSent;
volatile bool m_pingSent;
int32_t m_numberOfTimesFailed;
bool m_isMultiUserMode;
bool compareTransactionIds(int32_t reqTransId, int32_t replyTransId,
std::string& failReason, TcrConnection* conn);
void closeConnections( );
void setRetryAndTimeout( const TcrMessage& request, int& maxSendRetries, uint32_t& requestedTimeout );
std::string m_name;
ACE_Recursive_Thread_Mutex m_connectionLock;
volatile bool m_connected;
bool m_isActiveEndpoint;
int m_numRegions;
Set< uint16_t > m_ports;
int m_pingTimeouts;
int m_notifyCount;
CacheImpl* m_cache;
ACE_Semaphore& m_failoverSema;
ACE_Semaphore& m_cleanupSema;
ACE_Semaphore m_notificationCleanupSema;
ACE_Semaphore& m_redundancySema;
std::list<ThinClientBaseDM *> m_distMgrs;
ACE_Recursive_Thread_Mutex m_distMgrsLock;
uint32_t m_dupCount;
// ARB: TESTING: Durable clients - flag that indicates whether endpoint made connection to server that has HA queue.
ServerQueueStatus m_serverQueueStatus;
bool m_isServerQueueStatusSet;
int32_t m_queueSize;
//ThinClientPoolDM* m_poolHADM;
ThinClientBaseDM* m_baseDM;
// Disallow copy constructor and assignment operator.
TcrEndpoint(const TcrEndpoint &);
TcrEndpoint & operator = (const TcrEndpoint &);
// number of connections to this endpoint
volatile int32_t m_noOfConnRefs;
uint16_t m_distributedMemId;
protected:
static const char * NC_Notification;
};
}
#endif // __TCR_ENDPOINT_HPP__