| /*========================================================================= |
| * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. |
| * This product is protected by U.S. and international copyright |
| * and intellectual property laws. Pivotal products are covered by |
| * one or more patents listed at http://www.pivotal.io/patents. |
| *========================================================================= |
| */ |
| #include "../gfcpp_globals.hpp" |
| #include "ThinClientBaseDM.hpp" |
| #include "ThinClientCacheDistributionManager.hpp" |
| #include "TcrMessage.hpp" |
| #include "TcrEndpoint.hpp" |
| #include "../ExceptionTypes.hpp" |
| #include "ReadWriteLock.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "RemoteQueryService.hpp" |
| #include "CacheImpl.hpp" |
| #include "../CacheAttributes.hpp" |
| #include <algorithm> |
| |
| using namespace gemfire; |
| |
| ThinClientCacheDistributionManager::ThinClientCacheDistributionManager( |
| TcrConnectionManager& connManager) : |
| ThinClientDistributionManager(connManager, NULL) |
| { |
| } |
| |
| void ThinClientCacheDistributionManager::init() |
| { |
| LOGDEBUG("ThinClientCacheDistributionManager::init"); |
| if (m_connManager.getNumEndPoints() == 0) { |
| throw IllegalStateException("No endpoints defined for query."); |
| } |
| ThinClientBaseDM::init(); |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendSyncRequestCq(TcrMessage& request, TcrMessage& reply) |
| { |
| preFailoverAction(); |
| |
| reply.setDM(this); |
| |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointsLock); |
| |
| // Return best effort result: If CQ succeeds on ANY server return no-error even if |
| // other servers might fail since this method is called in non-HA with failover case. |
| |
| GfErrType err = GF_NOTCON; |
| GfErrType opErr = GF_NOERR; |
| |
| for (std::vector<TcrEndpoint*>::iterator ep = m_endpoints.begin(); ep != m_endpoints.end(); ++ep) { |
| if ((*ep)->connected()) { |
| (*ep)->setDM(this); |
| opErr = sendRequestToEP(request, reply, *ep);//this will go to ThinClientDistributionManager |
| if (opErr == GF_NOERR || (ThinClientBaseDM::isFatalClientError(opErr) && err != GF_NOERR)) { |
| err = opErr; |
| } |
| } |
| } |
| |
| // This should return only either NOERR (takes precedence), NOTCON or a "client fatal error" such as NotAuthorized. |
| return err; |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendSyncRequest( |
| TcrMessage& request, TcrMessage& reply, bool attemptFailover, bool isBGThread) |
| { |
| GfErrType err = GF_NOERR; |
| int32_t type = request.getMessageType(); |
| if(m_connManager.haEnabled() && ( type == TcrMessage::EXECUTECQ_MSG_TYPE || |
| type == TcrMessage::STOPCQ_MSG_TYPE || |
| type == TcrMessage::CLOSECQ_MSG_TYPE || |
| type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE|| |
| type == TcrMessage::GETCQSTATS_MSG_TYPE || |
| type == TcrMessage::MONITORCQ_MSG_TYPE || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| type == TcrMessage::GETDURABLECQS_MSG_TYPE )) |
| { |
| err = m_connManager.sendSyncRequestCq(request, reply); |
| } |
| else if ((type == TcrMessage::EXECUTECQ_MSG_TYPE || |
| type == TcrMessage::STOPCQ_MSG_TYPE || |
| type == TcrMessage::CLOSECQ_MSG_TYPE || |
| type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE|| |
| type == TcrMessage::GETCQSTATS_MSG_TYPE || |
| type == TcrMessage::MONITORCQ_MSG_TYPE || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| type == TcrMessage::GETDURABLECQS_MSG_TYPE )) |
| { |
| err = sendSyncRequestCq(request, reply); |
| } |
| else |
| { |
| err = ThinClientDistributionManager::sendSyncRequest(request, reply, attemptFailover); |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendRequestToPrimary( TcrMessage& request, TcrMessage& reply ) |
| { |
| GfErrType err = GF_NOTCON; |
| if ( m_connManager.haEnabled( ) ) { |
| err = m_connManager.sendRequestToPrimary( request, reply ); |
| } |
| else { |
| // ARB: Call sendSyncRequest() with failover enabled. |
| // TODO: Must ensure that active endpoint is correctly tracked so that request is sent to an endpoint |
| // for which callback connection is present. |
| err = ThinClientDistributionManager::sendSyncRequest( request, reply, true ); |
| } |
| return err; |
| } |
| |
| bool ThinClientCacheDistributionManager::preFailoverAction() |
| { |
| if (!m_initDone) { |
| // nothing to be done if not initialized |
| return true; |
| } |
| // [sumedh] take the global endpoint lock so that the global endpoints list |
| // does not change while we are (possibly) adding endpoint to this endpoints |
| // list and incrementing the reference count of endpoint |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard( |
| m_connManager.getGlobalEndpoints().mutex()); |
| // ARB: This method is called at the time of failover to refresh |
| // the list of endpoints. |
| std::vector<TcrEndpoint*> currentGlobalEndpointsList; |
| m_connManager.getAllEndpoints(currentGlobalEndpointsList); |
| |
| // ARB: Update local list with new endpoints. |
| std::vector<TcrEndpoint*> newEndpointsList; |
| for (std::vector<TcrEndpoint*>::iterator it = |
| currentGlobalEndpointsList.begin(); |
| it != currentGlobalEndpointsList.end(); ++it) { |
| bool found = false; |
| for (std::vector<TcrEndpoint*>::iterator currIter = m_endpoints.begin(); |
| currIter != m_endpoints.end(); ++currIter) { |
| if (*currIter == *it) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) |
| newEndpointsList.push_back(*it); |
| } |
| |
| for (std::vector<TcrEndpoint*>::iterator it = newEndpointsList.begin(); |
| it != newEndpointsList.end(); ++it) { |
| TcrEndpoint* ep = *it; |
| m_endpoints.push_back(ep); |
| ep->setNumRegions(ep->numRegions() + 1); |
| LOGFINER("TCCDM: incremented region reference count for endpoint %s " |
| "to %d", ep->name().c_str(), ep->numRegions()); |
| } |
| |
| return true; |
| } |
| bool ThinClientCacheDistributionManager::postFailoverAction( TcrEndpoint* endpoint ) |
| { |
| LOGDEBUG("ThinClientCacheDistributionManager : executeAllCqs"); |
| if(m_connManager.haEnabled()) |
| { |
| LOGDEBUG("ThinClientCacheDistributionManager : executeAllCqs HA: case, done else where"); |
| return true; |
| } |
| |
| CacheImpl *cache = m_connManager.getCacheImpl(); |
| |
| if(cache==NULL) |
| { |
| LOGERROR("Client not initialized for failover"); |
| return false; |
| } |
| try { |
| RemoteQueryServicePtr rqsService = dynCast<RemoteQueryServicePtr>(cache->getQueryService(true)); |
| rqsService->executeAllCqs(true); |
| } |
| catch (const Exception & excp) { |
| LOGWARN("Failed to recover CQs during failover attempt to endpoint[%s]: %s", |
| endpoint->name().c_str(), excp.getMessage()); |
| return false; |
| } |
| catch (...) { |
| LOGWARN("Failed to recover CQs during failover attempt to endpoint[%s]", |
| endpoint->name().c_str()); |
| return false; |
| } |
| return true; |
| } |