| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "ThinClientPoolDM.hpp" |
| #include "TcrEndpoint.hpp" |
| #include "ThinClientRegion.hpp" |
| #include <gfcpp/ResultCollector.hpp> |
| #include "ExecutionImpl.hpp" |
| #include "ExpiryHandler_T.hpp" |
| #include <ace/INET_Addr.h> |
| #include "ExpiryTaskManager.hpp" |
| #include <gfcpp/SystemProperties.hpp> |
| #include <statistics/PoolStatsSampler.hpp> |
| #include "DistributedSystemImpl.hpp" |
| #include "UserAttributes.hpp" |
| #include <algorithm> |
| #include "ThinClientStickyManager.hpp" |
| #include <gfcpp/PoolManager.hpp> |
| |
| #include "NonCopyable.hpp" |
| |
| using namespace apache::geode::client; |
| using namespace apache::geode::statistics; |
| |
| ExpiryTaskManager* getCacheImplExpiryTaskManager(); |
| void removePool(const char*); |
| |
| /* adongre |
| * CID 28730: Other violation (MISSING_COPY) |
| * Class "GetAllWork" owns resources that are managed in its constructor and |
| * destructor but has no user-written copy constructor. |
| * FIX : Make the class NonCopyable |
| */ |
| class GetAllWork : public PooledWork<GfErrType>, |
| private NonCopyable, |
| private NonAssignable { |
| ThinClientPoolDM* m_poolDM; |
| BucketServerLocationPtr m_serverLocation; |
| TcrMessage* m_request; |
| TcrMessageReply* m_reply; |
| MapOfUpdateCounters m_mapOfUpdateCounters; |
| bool m_attemptFailover; |
| bool m_isBGThread; |
| bool m_addToLocalCache; |
| UserAttributesPtr m_userAttribute; |
| ChunkedGetAllResponse* m_responseHandler; |
| std::string m_regionName; |
| const VectorOfCacheableKeyPtr m_keys; |
| const RegionPtr m_region; |
| TcrChunkedResult* m_resultCollector; |
| const UserDataPtr& m_aCallbackArgument; |
| |
| public: |
| GetAllWork(ThinClientPoolDM* poolDM, const RegionPtr& region, |
| const BucketServerLocationPtr& serverLocation, |
| const VectorOfCacheableKeyPtr& keys, bool attemptFailover, |
| bool isBGThread, bool addToLocalCache, |
| ChunkedGetAllResponse* responseHandler, |
| const UserDataPtr& aCallbackArgument) |
| : m_poolDM(poolDM), |
| m_serverLocation(serverLocation), |
| m_attemptFailover(attemptFailover), |
| m_isBGThread(isBGThread), |
| m_addToLocalCache(addToLocalCache), |
| m_userAttribute(NULLPTR), |
| m_responseHandler(responseHandler), |
| m_regionName(region->getFullPath()), |
| m_keys(keys), |
| m_region(region), |
| m_aCallbackArgument(aCallbackArgument) { |
| m_request = new TcrMessageGetAll(region.ptr(), m_keys.ptr(), m_poolDM, |
| m_aCallbackArgument); |
| m_reply = new TcrMessageReply(true, m_poolDM); |
| if (m_poolDM->isMultiUserMode()) { |
| m_userAttribute = TSSUserAttributesWrapper::s_geodeTSSUserAttributes |
| ->getUserAttributes(); |
| } |
| |
| // HashMapOfCacheablePtr values = NULLPTR; |
| // if (!m_addToLocalCache) { |
| // values = new HashMapOfCacheable(); |
| //} |
| |
| // HashMapOfExceptionPtr exceptions(new HashMapOfException()); |
| // VectorOfCacheableKeyPtr resultKeys(new VectorOfCacheableKey()); |
| |
| m_resultCollector = (new ChunkedGetAllResponse( |
| *m_reply, dynamic_cast<ThinClientRegion*>(m_region.ptr()), m_keys.ptr(), |
| m_responseHandler->getValues(), m_responseHandler->getExceptions(), |
| m_responseHandler->getResultKeys(), |
| m_responseHandler->getUpdateCounters(), 0, m_addToLocalCache, |
| m_responseHandler->getResponseLock())); |
| |
| m_reply->setChunkedResultHandler(m_resultCollector); |
| } |
| |
| ~GetAllWork() { |
| delete m_request; |
| delete m_reply; |
| delete m_resultCollector; |
| } |
| |
| TcrMessage* getReply() { return m_reply; } |
| |
| void init() {} |
| GfErrType execute(void) { |
| // init(); |
| |
| GuardUserAttribures gua; |
| |
| if (m_userAttribute != NULLPTR) { |
| gua.setProxyCache(m_userAttribute->getProxyCache()); |
| } |
| m_request->InitializeGetallMsg( |
| m_request->getCallbackArgument()); // now init getall msg |
| return m_poolDM->sendSyncRequest(*m_request, *m_reply, m_attemptFailover, |
| m_isBGThread, m_serverLocation); |
| } |
| }; |
| |
| const char* ThinClientPoolDM::NC_Ping_Thread = "NC Ping Thread"; |
| const char* ThinClientPoolDM::NC_MC_Thread = "NC MC Thread"; |
| #define PRIMARY_QUEUE_NOT_AVAILABLE -2 |
| |
| ThinClientPoolDM::ThinClientPoolDM(const char* name, |
| PoolAttributesPtr poolAttrs, |
| TcrConnectionManager& connManager) |
| : ThinClientBaseDM(connManager, NULL), |
| Pool(poolAttrs), |
| m_poolName(name), |
| m_stats(NULL), |
| m_sticky(false), |
| m_updateLocatorListSema(0), |
| m_pingSema(0), |
| m_cliCallbackSema(0), |
| m_isDestroyed(false), |
| m_destroyPending(false), |
| m_destroyPendingHADM(false), |
| m_isMultiUserMode(false), |
| m_locHelper(NULL), |
| m_poolSize(0), |
| m_numRegions(0), |
| m_server(0), |
| m_connSema(0), |
| m_connManageTask(NULL), |
| m_pingTask(NULL), |
| m_updateLocatorListTask(NULL), |
| m_cliCallbackTask(NULL), |
| m_pingTaskId(-1), |
| m_updateLocatorListTaskId(-1), |
| m_connManageTaskId(-1), |
| m_PoolStatsSampler(NULL), |
| m_clientMetadataService(NULL), |
| m_primaryServerQueueSize(PRIMARY_QUEUE_NOT_AVAILABLE) { |
| static bool firstGurd = false; |
| if (firstGurd) ClientProxyMembershipID::increaseSynchCounter(); |
| firstGurd = true; |
| |
| SystemProperties* sysProp = DistributedSystem::getSystemProperties(); |
| // to set security flag at pool level |
| this->m_isSecurityOn = sysProp->isSecurityOn(); |
| |
| ACE_TCHAR hostName[256]; |
| ACE_OS::hostname(hostName, sizeof(hostName) - 1); |
| ACE_INET_Addr driver(hostName); |
| uint32_t hostAddr = driver.get_ip_address(); |
| uint16_t hostPort = 0; |
| const char* durableId = (sysProp != NULL) ? sysProp->durableClientId() : NULL; |
| |
| std::string poolSeparator = "_gem_"; |
| std::string clientDurableId = |
| (durableId == NULL || strlen(durableId) == 0) |
| ? durableId |
| : durableId + (m_poolName.c_str() != NULL |
| ? (poolSeparator + m_poolName) |
| : ""); |
| |
| const uint32_t durableTimeOut = |
| (sysProp != NULL) ? sysProp->durableTimeout() : 0; |
| m_memId = new ClientProxyMembershipID( |
| hostName, hostAddr, hostPort, clientDurableId.c_str(), durableTimeOut); |
| |
| if (m_attrs->m_initLocList.size() == 0 && |
| m_attrs->m_initServList.size() == 0) { |
| std::string msg = "No locators or servers provided for pool named "; |
| msg += name; |
| throw IllegalStateException(msg.c_str()); |
| } |
| reset(); |
| m_locHelper = new ThinClientLocatorHelper(m_attrs->m_initLocList, this); |
| |
| m_stats = new PoolStats(m_poolName.c_str()); |
| |
| if (!sysProp->isEndpointShufflingDisabled()) { |
| if (m_attrs->m_initServList.size() > 0) { |
| RandGen randgen; |
| m_server = randgen(static_cast<uint32_t>(m_attrs->m_initServList.size())); |
| } |
| } |
| if (m_attrs->getPRSingleHopEnabled()) { |
| m_clientMetadataService = new ClientMetadataService(PoolPtr(this)); |
| } |
| m_manager = new ThinClientStickyManager(this); |
| } |
| |
| void ThinClientPoolDM::init() { |
| LOGDEBUG("ThinClientPoolDM::init: Starting pool initialization"); |
| |
| SystemProperties* sysProp = DistributedSystem::getSystemProperties(); |
| m_isMultiUserMode = this->getMultiuserAuthentication(); |
| if (m_isMultiUserMode) { |
| LOGINFO("Multiuser authentication is enabled for pool %s", |
| m_poolName.c_str()); |
| } |
| // to set security flag at pool level |
| this->m_isSecurityOn = sysProp->isSecurityOn(); |
| |
| LOGDEBUG("ThinClientPoolDM::init: security in on/off = %d ", |
| this->m_isSecurityOn); |
| |
| m_connManager.init(true); |
| // int min = m_attrs->getMinConnections(); |
| // int limit = 2*min, count = 0; |
| // |
| // //Add Min Connections @ start |
| // GfErrType err = GF_NOERR; |
| // while (limit-- && count < min) { |
| // TcrConnection* conn = NULL; |
| // std::set< ServerLocation > excludeServers; |
| // err = createPoolConnection( conn, excludeServers ); |
| // if (isFatalError(err)) { |
| // if ( err == GF_CACHE_LOCATOR_EXCEPTION ) { |
| // LOGWARN( "No locators were available during pool initialization." ); |
| // continue; |
| // } |
| // GfErrTypeToException("ThinClientPoolDM::init", err); |
| // }else if(err != GF_NOERR){ |
| // continue; |
| // }else { |
| // LOGDEBUG("ThinClientPoolDM::init: Adding a connection to the pool"); |
| // ++count; |
| // //Stats |
| // getStats().incMinPoolSizeConnects(); |
| // put( conn, false ); |
| // } |
| // } |
| |
| /* |
| if (m_isMultiUserMode == true) |
| LOGCONFIG("Multi user security mode is enabled."); |
| */ |
| |
| SystemProperties* props = DistributedSystem::getSystemProperties(); |
| |
| LOGDEBUG("ThinClientPoolDM::init: is grid client = %d ", |
| props->isGridClient()); |
| |
| if (!props->isGridClient()) { |
| ThinClientPoolDM::startBackgroundThreads(); |
| } |
| |
| LOGDEBUG("ThinClientPoolDM::init: Completed initialization"); |
| } |
| |
| PropertiesPtr ThinClientPoolDM::getCredentials(TcrEndpoint* ep) { |
| PropertiesPtr tmpSecurityProperties = |
| DistributedSystem::getSystemProperties()->getSecurityProperties(); |
| |
| AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader(); |
| |
| if (authInitialize != NULLPTR) { |
| LOGFINER( |
| "ThinClientPoolDM::getCredentials: acquired handle to authLoader, " |
| "invoking getCredentials %s", |
| ep->name().c_str()); |
| /* adongre |
| * CID 28901: Copy into fixed size buffer (STRING_OVERFLOW) |
| * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying |
| * the |
| * return value of "stlp_std::basic_string<char, |
| * stlp_std::char_traits<char>, |
| * stlp_std::allocator<char> >::c_str() const" without checking the length. |
| */ |
| // char tmpEndpoint[100] = { '\0' } ; |
| // strcpy(tmpEndpoint, ep->name().c_str()); |
| PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials( |
| tmpSecurityProperties, /*tmpEndpoint*/ ep->name().c_str()); |
| return tmpAuthIniSecurityProperties; |
| } |
| return NULLPTR; |
| } |
| |
| void ThinClientPoolDM::startBackgroundThreads() { |
| LOGDEBUG("ThinClientPoolDM::startBackgroundThreads: Starting ping thread"); |
| m_pingTask = new GF_TASK_T<ThinClientPoolDM>( |
| this, &ThinClientPoolDM::pingServer, NC_Ping_Thread); |
| m_pingTask->start(); |
| |
| SystemProperties* props = DistributedSystem::getSystemProperties(); |
| |
| if (props->onClientDisconnectClearPdxTypeIds() == true) { |
| m_cliCallbackTask = |
| new GF_TASK_T<ThinClientPoolDM>(this, &ThinClientPoolDM::cliCallback); |
| m_cliCallbackTask->start(); |
| } |
| |
| LOGDEBUG("ThinClientPoolDM::startBackgroundThreads: Creating ping task"); |
| ACE_Event_Handler* pingHandler = |
| new ExpiryHandler_T<ThinClientPoolDM>(this, &ThinClientPoolDM::doPing); |
| |
| long pingInterval = getPingInterval() / (1000 * 2); |
| if (pingInterval > 0) { |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Scheduling ping task at %ld", |
| pingInterval); |
| m_pingTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask( |
| pingHandler, 1, pingInterval, false); |
| } else { |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Not Scheduling ping task as " |
| "ping interval %ld", |
| getPingInterval()); |
| } |
| |
| long updateLocatorListInterval = getUpdateLocatorListInterval(); |
| |
| if (updateLocatorListInterval > 0) { |
| m_updateLocatorListTask = new GF_TASK_T<ThinClientPoolDM>( |
| this, &ThinClientPoolDM::updateLocatorList); |
| m_updateLocatorListTask->start(); |
| |
| updateLocatorListInterval = updateLocatorListInterval / 1000; // seconds |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Creating updateLocatorList " |
| "task"); |
| ACE_Event_Handler* updateLocatorListHandler = |
| new ExpiryHandler_T<ThinClientPoolDM>( |
| this, &ThinClientPoolDM::doUpdateLocatorList); |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Scheduling updater Locator " |
| "task at %ld", |
| updateLocatorListInterval); |
| m_updateLocatorListTaskId = |
| getCacheImplExpiryTaskManager()->scheduleExpiryTask( |
| updateLocatorListHandler, 1, updateLocatorListInterval, false); |
| } |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Starting manageConnections " |
| "thread"); |
| // Manage Connection Thread |
| m_connManageTask = new GF_TASK_T<ThinClientPoolDM>( |
| this, &ThinClientPoolDM::manageConnections, NC_MC_Thread); |
| m_connManageTask->start(); |
| |
| int idle = getIdleTimeout(); |
| int load = getLoadConditioningInterval(); |
| |
| if (load != -1) { |
| if (load < idle || idle == -1) { |
| idle = load; |
| } |
| } |
| |
| if (idle != -1) { |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Starting manageConnections " |
| "task"); |
| ACE_Event_Handler* connHandler = new ExpiryHandler_T<ThinClientPoolDM>( |
| this, &ThinClientPoolDM::doManageConnections); |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Scheduling " |
| "manageConnections task"); |
| m_connManageTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask( |
| connHandler, 1, idle / 1000 + 1, false); |
| } |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Starting remote query " |
| "service"); |
| // Init Query Service |
| m_remoteQueryServicePtr = |
| new RemoteQueryService(m_connManager.getCacheImpl(), this); |
| m_remoteQueryServicePtr->init(); |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::startBackgroundThreads: Starting pool stat sampler"); |
| if (m_PoolStatsSampler == NULL && getStatisticInterval() > -1 && |
| DistributedSystem::getSystemProperties()->statisticsEnabled()) { |
| m_PoolStatsSampler = new PoolStatsSampler( |
| getStatisticInterval() / 1000 + 1, m_connManager.getCacheImpl(), this); |
| m_PoolStatsSampler->start(); |
| } |
| |
| // starting chunk processing helper thread |
| ThinClientBaseDM::init(); |
| |
| if (m_clientMetadataService != NULL) { |
| // m_clientMetadataService->start(PoolManager::find(m_poolName.c_str())); |
| m_clientMetadataService->start(); |
| } |
| } |
| int ThinClientPoolDM::manageConnections(volatile bool& isRunning) { |
| LOGFINE("ThinClientPoolDM: starting manageConnections thread"); |
| |
| while (isRunning) { |
| m_connSema.acquire(); |
| if (isRunning) { |
| manageConnectionsInternal(isRunning); |
| while (m_connSema.tryacquire() != -1) { |
| ; |
| } |
| } |
| } |
| LOGFINE("ThinClientPoolDM: ending manageConnections thread"); |
| return 0; |
| } |
| |
| void ThinClientPoolDM::cleanStaleConnections(volatile bool& isRunning) { |
| if (!isRunning) { |
| return; |
| } |
| |
| LOGDEBUG("Cleaning stale connections"); |
| |
| int idle = getIdleTimeout(); |
| |
| ACE_Time_Value _idle(idle / 1000, (idle % 1000) * 1000); |
| ACE_Time_Value _nextIdle = _idle; |
| { |
| // ACE_Guard<ACE_Recursive_Thread_Mutex> poolguard(m_queueLock); |
| |
| TcrConnection* conn = NULL; |
| |
| std::vector<TcrConnection*> savelist; |
| std::vector<TcrConnection*> replacelist; |
| std::set<ServerLocation> excludeServers; |
| |
| while ((conn = getNoWait()) != NULL && isRunning) { |
| if (canItBeDeleted(conn)) { |
| replacelist.push_back(conn); |
| } else if (conn) { |
| ACE_Time_Value nextIdle = |
| _idle - (ACE_OS::gettimeofday() - conn->getLastAccessed()); |
| if ((ACE_Time_Value(0, 0) < nextIdle) && (nextIdle < _nextIdle)) { |
| _nextIdle = nextIdle; |
| } |
| savelist.push_back(conn); |
| } |
| } |
| |
| size_t replaceCount = |
| m_attrs->getMinConnections() - static_cast<int>(savelist.size()); |
| |
| for (std::vector<TcrConnection*>::const_iterator iter = replacelist.begin(); |
| iter != replacelist.end(); ++iter) { |
| TcrConnection* conn = *iter; |
| if (replaceCount <= 0) { |
| GF_SAFE_DELETE_CON(conn); |
| removeEPConnections(1, false); |
| getStats().incLoadCondDisconnects(); |
| LOGDEBUG("Removed a connection"); |
| } else { |
| GfErrType error = GF_NOERR; |
| TcrConnection* newConn = NULL; |
| bool maxConnLimit = false; |
| error = createPoolConnection(newConn, excludeServers, maxConnLimit, |
| /*hasExpired(conn) ? NULL :*/ conn); |
| if (newConn) { |
| ACE_Time_Value nextIdle = |
| _idle - (ACE_OS::gettimeofday() - newConn->getLastAccessed()); |
| if ((ACE_Time_Value(0, 0) < nextIdle) && (nextIdle < _nextIdle)) { |
| _nextIdle = nextIdle; |
| } |
| savelist.push_back(newConn); |
| if (newConn != conn) { |
| GF_SAFE_DELETE_CON(conn); |
| removeEPConnections(1, false); |
| getStats().incLoadCondDisconnects(); |
| LOGDEBUG("Removed a connection"); |
| } |
| } else { |
| if (hasExpired(conn)) { |
| GF_SAFE_DELETE_CON(conn); |
| removeEPConnections(1, false); |
| getStats().incLoadCondDisconnects(); |
| LOGDEBUG("Removed a connection"); |
| } else { |
| conn->updateCreationTime(); |
| ACE_Time_Value nextIdle = |
| _idle - (ACE_OS::gettimeofday() - conn->getLastAccessed()); |
| if ((ACE_Time_Value(0, 0) < nextIdle) && (nextIdle < _nextIdle)) { |
| _nextIdle = nextIdle; |
| } |
| savelist.push_back(conn); |
| } |
| } |
| } |
| replaceCount--; |
| } |
| |
| LOGDEBUG("Preserving %d connections", savelist.size()); |
| |
| for (std::vector<TcrConnection*>::const_iterator iter = savelist.begin(); |
| iter != savelist.end(); ++iter) { |
| put(*iter, false); |
| } |
| } |
| if (m_connManageTaskId >= 0 && isRunning && |
| getCacheImplExpiryTaskManager()->resetTask( |
| m_connManageTaskId, static_cast<uint32_t>(_nextIdle.sec() + 1))) { |
| LOGERROR("Failed to reschedule connection manager"); |
| } else { |
| LOGFINEST("Rescheduled next connection manager run after %d seconds", |
| _nextIdle.sec() + 1); |
| } |
| |
| LOGDEBUG("Pool size is %d, pool counter is %d", size(), m_poolSize); |
| } |
| void ThinClientPoolDM::cleanStickyConnections(volatile bool& isRunning) {} |
| |
| void ThinClientPoolDM::restoreMinConnections(volatile bool& isRunning) { |
| if (!isRunning) { |
| return; |
| } |
| |
| LOGDEBUG("Restoring minimum connection level"); |
| |
| int min = m_attrs->getMinConnections(); |
| int limit = 2 * min; |
| |
| std::set<ServerLocation> excludeServers; |
| |
| int restored = 0; |
| |
| if (m_poolSize < min) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> poolguard(m_queueLock); |
| while (m_poolSize < min && limit-- && isRunning) { |
| TcrConnection* conn = NULL; |
| bool maxConnLimit = false; |
| createPoolConnection(conn, excludeServers, maxConnLimit); |
| if (conn) { |
| put(conn, false); |
| restored++; |
| getStats().incMinPoolSizeConnects(); |
| } |
| } |
| } |
| |
| LOGDEBUG("Restored %d connections", restored); |
| LOGDEBUG("Pool size is %d, pool counter is %d", size(), m_poolSize); |
| } |
| |
| int ThinClientPoolDM::manageConnectionsInternal(volatile bool& isRunning) { |
| try { |
| LOGFINE( |
| "ThinClientPoolDM::manageConnections(): checking connections in pool " |
| "queue %d", |
| size()); |
| |
| cleanStaleConnections(isRunning); |
| |
| cleanStickyConnections(isRunning); |
| |
| restoreMinConnections(isRunning); |
| |
| // ((ThinClientLocatorHelper*)m_locHelper)->updateLocators(this->getServerGroup()); |
| |
| getStats().setCurPoolConnections(m_poolSize); |
| } catch (const Exception& e) { |
| LOGERROR(e.getMessage()); |
| } catch (const std::exception& e) { |
| LOGERROR(e.what()); |
| } catch (...) { |
| LOGERROR("Unexpected exception during manage connections"); |
| } |
| return 0; |
| } |
| |
| std::string ThinClientPoolDM::selectEndpoint( |
| std::set<ServerLocation>& excludeServers, |
| const TcrConnection* currentServer) { |
| if (m_attrs->m_initLocList.size()) { // query locators |
| ServerLocation outEndpoint; |
| std::string additionalLoc; |
| LOGFINE("Asking locator for server from group [%s]", |
| m_attrs->m_serverGrp.c_str()); |
| |
| // Update Locator Request Stats |
| getStats().incLoctorRequests(); |
| |
| if (GF_NOERR != |
| ((ThinClientLocatorHelper*)m_locHelper) |
| ->getEndpointForNewFwdConn(outEndpoint, additionalLoc, |
| excludeServers, m_attrs->m_serverGrp, |
| currentServer)) { |
| throw IllegalStateException("Locator query failed"); |
| } |
| // Update Locator stats |
| getStats().setLocators( |
| ((ThinClientLocatorHelper*)m_locHelper)->getCurLocatorsNum()); |
| getStats().incLoctorResposes(); |
| |
| char epNameStr[128] = {0}; |
| ACE_OS::snprintf(epNameStr, 128, "%s:%d", |
| outEndpoint.getServerName().c_str(), |
| outEndpoint.getPort()); |
| LOGFINE("ThinClientPoolDM: Locator returned endpoint [%s]", epNameStr); |
| return epNameStr; |
| } else if (m_attrs->m_initServList |
| .size()) { // use specified server endpoints |
| // highly complex round-robin algorithm |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_endpointSelectionLock); |
| if (m_server >= m_attrs->m_initServList.size()) { |
| m_server = 0; |
| } |
| |
| unsigned int epCount = 0; |
| do { |
| if (!excludeServer(m_attrs->m_initServList[m_server], excludeServers)) { |
| LOGFINE("ThinClientPoolDM: Selecting endpoint [%s] from position %d", |
| m_attrs->m_initServList[m_server].c_str(), m_server); |
| return m_attrs->m_initServList[m_server++]; |
| } else { |
| if (++m_server >= m_attrs->m_initServList.size()) { |
| m_server = 0; |
| } |
| } |
| } while (++epCount < m_attrs->m_initServList.size()); |
| |
| throw NotConnectedException("No server endpoints are available."); |
| } else { |
| LOGERROR("No locators or servers provided"); |
| throw IllegalStateException("No locators or servers provided"); |
| } |
| } |
| |
| void ThinClientPoolDM::addConnection(TcrConnection* conn) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(getPoolLock()); |
| |
| put(conn, false); |
| ++m_poolSize; |
| } |
| GfErrType ThinClientPoolDM::sendRequestToAllServers( |
| const char* func, uint8_t getResult, uint32_t timeout, CacheablePtr args, |
| ResultCollectorPtr& rs, CacheableStringPtr& exceptionPtr) { |
| GfErrType err = GF_NOERR; |
| |
| HostAsm::atomicAdd(m_clientOps, 1); |
| getStats().setCurClientOps(m_clientOps); |
| /*get the ClientOp start time*/ |
| // int64 sampleStartNanos =Utils::startStatOpTime(); |
| |
| ACE_Recursive_Thread_Mutex resultCollectorLock; |
| |
| CacheableStringArrayPtr csArray = getServers(); |
| |
| if (csArray != NULLPTR && csArray->length() == 0) { |
| LOGWARN("No server found to execute the function"); |
| return GF_NOSERVER_FOUND; |
| } |
| |
| int feIndex = 0; |
| FunctionExecution* fePtrList = new FunctionExecution[csArray->length()]; |
| ThreadPool* threadPool = TPSingleton::instance(); |
| UserAttributesPtr userAttr = |
| TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes(); |
| for (int i = 0; i < csArray->length(); i++) { |
| CacheableStringPtr cs = csArray[i]; |
| std::string endpointStr(cs->asChar()); |
| TcrEndpoint* ep = NULL; |
| /* |
| std::string endpointStr = Utils::convertHostToCanonicalForm(cs->asChar() ); |
| */ |
| if (m_endpoints.find(endpointStr, ep)) { |
| ep = addEP(cs->asChar()); |
| } else if (!ep->connected()) { |
| LOGFINE( |
| "ThinClientPoolDM::sendRequestToAllServers server not connected %s ", |
| cs->asChar()); |
| // continue; |
| } |
| FunctionExecution* funcExe = &fePtrList[feIndex++]; |
| funcExe->setParameters(func, getResult, timeout, args, ep, this, |
| &resultCollectorLock, &rs, userAttr); |
| threadPool->perform(funcExe); |
| } |
| GfErrType finalErrorReturn = GF_NOERR; |
| |
| for (int i = 0; i < feIndex; i++) { |
| FunctionExecution* funcExe = &fePtrList[i]; |
| err = funcExe->getResult(); |
| if (err != GF_NOERR) { |
| if (funcExe->getException() == NULLPTR) { |
| if (err == GF_TIMOUT) { |
| getStats().incTimeoutClientOps(); |
| } else { |
| getStats().incFailedClientOps(); |
| } |
| if (err == GF_IOERR) { |
| err = GF_NOTCON; |
| } |
| } else { |
| exceptionPtr = funcExe->getException(); |
| } |
| |
| // delete [] fePtrList; |
| // return err; |
| } |
| |
| if (err == GF_AUTHENTICATION_FAILED_EXCEPTION || |
| err == GF_NOT_AUTHORIZED_EXCEPTION || |
| err == GF_AUTHENTICATION_REQUIRED_EXCEPTION) { |
| finalErrorReturn = err; |
| } else if (!(finalErrorReturn == GF_AUTHENTICATION_FAILED_EXCEPTION || |
| finalErrorReturn == GF_NOT_AUTHORIZED_EXCEPTION || |
| finalErrorReturn == |
| GF_AUTHENTICATION_REQUIRED_EXCEPTION)) // returning auth |
| // errors |
| // to client..preference |
| // over other errors.. |
| { |
| finalErrorReturn = err; |
| } |
| } |
| |
| if (static_cast<uint8_t>(getResult & 2) == static_cast<uint8_t>(2)) { |
| rs->endResults(); |
| } |
| |
| HostAsm::atomicAdd(m_clientOps, -1); |
| getStats().setCurClientOps(m_clientOps); |
| getStats().incSucceedClientOps(); |
| /*Update the time stat for clientOpsTime */ |
| // Utils::updateStatOpTime(getStats().getStats(), |
| // m_poolStatType->getInstance()->getClientOpsSucceededTimeId(), |
| // sampleStartNanos); |
| |
| delete[] fePtrList; |
| return finalErrorReturn; |
| } |
| |
| const CacheableStringArrayPtr ThinClientPoolDM::getLocators() const { |
| int32_t size = static_cast<int32_t>(m_attrs->m_initLocList.size()); |
| CacheableStringPtr* ptrArr = new CacheableStringPtr[size]; |
| |
| for (int i = 0; i < size; ++i) { |
| ptrArr[i] = CacheableString::create( |
| m_attrs->m_initLocList[i].c_str(), |
| static_cast<int32_t>(m_attrs->m_initLocList[i].length())); |
| } |
| return CacheableStringArray::createNoCopy(ptrArr, size); |
| } |
| |
| const CacheableStringArrayPtr ThinClientPoolDM::getServers() { |
| int32_t size = static_cast<int32_t>(m_attrs->m_initServList.size()); |
| CacheableStringPtr* ptrArr = NULL; |
| |
| if (size > 0) { |
| ptrArr = new CacheableStringPtr[size]; |
| for (int32_t i = 0; i < size; ++i) { |
| ptrArr[i] = CacheableString::create( |
| m_attrs->m_initServList[i].c_str(), |
| static_cast<int32_t>(m_attrs->m_initServList[i].length())); |
| } |
| } |
| |
| if (size > 0) return CacheableStringArray::createNoCopy(ptrArr, size); |
| |
| size = static_cast<int32_t>(m_attrs->m_initLocList.size()); |
| |
| // get dynamic added servers using locators |
| if (size > 0) { |
| std::vector<ServerLocation> vec; |
| ((ThinClientLocatorHelper*)m_locHelper) |
| ->getAllServers(vec, m_attrs->m_serverGrp); |
| |
| ptrArr = new CacheableStringPtr[vec.size()]; |
| std::vector<ServerLocation>::iterator it; |
| |
| size = static_cast<int32_t>(vec.size()); |
| |
| char buffer[256] = {'\0'}; |
| int i = 0; |
| for (it = vec.begin(); it < vec.end(); it++) { |
| ServerLocation serLoc = *it; |
| ACE_OS::snprintf(buffer, 256, "%s:%d", serLoc.getServerName().c_str(), |
| serLoc.getPort()); |
| ptrArr[i++] = CacheableString::create(buffer); |
| } |
| } |
| |
| return CacheableStringArray::createNoCopy(ptrArr, size); |
| } |
| |
| void ThinClientPoolDM::stopPingThread() { |
| if (m_pingTask) { |
| LOGFINE("ThinClientPoolDM::destroy(): Closing ping thread."); |
| m_pingTask->stopNoblock(); |
| m_pingSema.release(); |
| m_pingTask->wait(); |
| GF_SAFE_DELETE(m_pingTask); |
| if (m_pingTaskId >= 0) { |
| getCacheImplExpiryTaskManager()->cancelTask(m_pingTaskId); |
| } |
| } |
| } |
| |
| void ThinClientPoolDM::stopUpdateLocatorListThread() { |
| if (m_updateLocatorListTask) { |
| LOGFINE("ThinClientPoolDM::destroy(): Closing updateLocatorList thread."); |
| m_updateLocatorListTask->stopNoblock(); |
| m_updateLocatorListSema.release(); |
| m_updateLocatorListTask->wait(); |
| GF_SAFE_DELETE(m_updateLocatorListTask); |
| if (m_updateLocatorListTaskId >= 0) { |
| getCacheImplExpiryTaskManager()->cancelTask(m_updateLocatorListTaskId); |
| } |
| } |
| } |
| |
| void ThinClientPoolDM::stopCliCallbackThread() { |
| if (m_cliCallbackTask) { |
| LOGFINE("ThinClientPoolDM::destroy(): Closing cliCallback thread."); |
| m_cliCallbackTask->stopNoblock(); |
| m_cliCallbackSema.release(); |
| m_cliCallbackTask->wait(); |
| GF_SAFE_DELETE(m_cliCallbackTask); |
| } |
| } |
| |
| void ThinClientPoolDM::destroy(bool keepAlive) { |
| LOGDEBUG("ThinClientPoolDM::destroy..."); |
| if (!m_isDestroyed && (!m_destroyPending || m_destroyPendingHADM)) { |
| checkRegions(); |
| TcrMessage::setKeepAlive(keepAlive); |
| if (m_remoteQueryServicePtr != NULLPTR) { |
| m_remoteQueryServicePtr->close(); |
| m_remoteQueryServicePtr = NULLPTR; |
| } |
| |
| LOGDEBUG("Closing PoolStatsSampler thread."); |
| if (m_PoolStatsSampler != NULL) { |
| m_PoolStatsSampler->stop(); |
| GF_SAFE_DELETE(m_PoolStatsSampler); |
| } |
| LOGDEBUG("PoolStatsSampler thread closed ."); |
| stopCliCallbackThread(); |
| LOGDEBUG("ThinClientPoolDM::destroy( ): Closing connection manager."); |
| if (m_connManageTask) { |
| m_connManageTask->stopNoblock(); |
| m_connSema.release(); |
| m_connManageTask->wait(); |
| GF_SAFE_DELETE(m_connManageTask); |
| if (m_connManageTaskId >= 0) { |
| getCacheImplExpiryTaskManager()->cancelTask(m_connManageTaskId); |
| } |
| } |
| |
| LOGDEBUG("Closing PoolStatsSampler thread."); |
| stopPingThread(); |
| stopUpdateLocatorListThread(); |
| |
| if (m_clientMetadataService != NULL) { |
| m_clientMetadataService->stop(); |
| } |
| // closing all the thread local connections ( sticky). |
| LOGDEBUG("ThinClientPoolDM::destroy( ): closing FairQueue, pool size = %d", |
| m_poolSize); |
| close(); |
| LOGDEBUG("ThinClientPoolDM::destroy( ): after close "); |
| |
| /********************************************************************** |
| ==31849== at 0x4007D75: operator new(unsigned int) |
| (vg_replace_malloc.c:313) |
| ==31849== by 0x423063E: |
| apache::geode::client::ThinClientPoolHADM::createEP(char |
| const*) (ThinClientPoolHADM.hpp:113) |
| ==31849== by 0x4375CB7: |
| apache::geode::client::ThinClientPoolDM::addEP(char const*) |
| (ThinClientPoolDM.cpp:1893) |
| ==31849== by 0x437DC29: |
| apache::geode::client::ThinClientPoolDM::addEP(apache::geode::client::ServerLocation&) |
| (ThinClientPoolDM.cpp:1880) |
| ==31849== by 0x43861D9: |
| apache::geode::client::ThinClientRedundancyManager::getAllEndpoints(stlp_std::vector<apache::geode::client::TcrEndpoint*, |
| stlp_std::allocator<apache::geode::client::TcrEndpoint*> >&) |
| (ThinClientRedundancyManager.cpp:912) |
| ==31849== by 0x4386D0E: |
| apache::geode::client::ThinClientRedundancyManager::initialize(int) |
| (ThinClientRedundancyManager.cpp:613) |
| ==31849== by 0x4382301: |
| apache::geode::client::ThinClientPoolHADM::startBackgroundThreads() |
| (ThinClientPoolHADM.cpp:37) |
| ==31849== by 0x422F11F: apache::geode::client::PoolFactory::create(char |
| const*) |
| (PoolFactory.cpp:163) |
| ==31849== by 0x8066755: CacheHelper::createPooledRegion(char const*, |
| bool, char const*, char const*, char const*, bool, bool, int, int, int, |
| int, int, |
| apache::geode::client::SharedPtr<apache::geode::client::CacheListener> |
| const&, |
| apache::geode::client::ExpirationAction::Action) (in |
| /export/pnq-gst-dev01a/users/adongre/valgrind_702/nc/ThinClient702X_maint/build-artifacts/linux/tests/cppcache/testThinClientPutAll) |
| ==31849== by 0x805AD4D: createPooledRegion(char const*, bool, char |
| const*, char const*, char const*, bool, bool) [clone .clone.2] (in |
| /export/pnq-gst-dev01a/users/adongre/valgrind_702/nc/ThinClient702X_maint/build-artifacts/linux/tests/cppcache/testThinClientPutAll) |
| ==31849== by 0x8066F6E: Task_StepOne_Pooled_Locator::doTask() (in |
| /export/pnq-gst-dev01a/users/adongre/valgrind_702/nc/ThinClient702X_maint/build-artifacts/linux/tests/cppcache/testThinClientPutAll) |
| ==31849== by 0x806DB7A: dunit::TestSlave::begin() (in |
| /export/pnq-gst-dev01a/users/adongre/valgrind_702/nc/ThinClient702X_maint/build-artifacts/linux/tests/cppcache/testThinClientPutAll) |
| ==31849== |
| ******************************************************************************/ |
| for (ACE_Map_Manager<std::string, TcrEndpoint*, |
| ACE_Recursive_Thread_Mutex>::iterator iter = |
| m_endpoints.begin(); |
| iter != m_endpoints.end(); ++iter) { |
| TcrEndpoint* ep = (*iter).int_id_; |
| LOGFINE("ThinClientPoolDM: forcing endpoint delete for %d in destructor", |
| ep->name().c_str()); |
| GF_SAFE_DELETE(ep); |
| } |
| |
| // Close Stats |
| getStats().close(); |
| |
| if (m_clientMetadataService != NULL) { |
| GF_SAFE_DELETE(m_clientMetadataService); |
| } |
| |
| removePool(m_poolName.c_str()); |
| |
| stopChunkProcessor(); |
| m_manager->closeAllStickyConnections(); |
| // delete m_manager; m_manager = NULL; |
| m_isDestroyed = true; |
| LOGDEBUG("ThinClientPoolDM::destroy( ): after close m_isDestroyed = %d ", |
| m_isDestroyed); |
| } |
| if (m_poolSize != 0) { |
| LOGFINE("Pool connection size is not zero %d", m_poolSize); |
| } |
| } |
| |
| bool ThinClientPoolDM::isDestroyed() const { |
| // TODO: dummy implementation |
| return m_isDestroyed; |
| } |
| |
| QueryServicePtr ThinClientPoolDM::getQueryService() { |
| // TODO: |
| if (m_isMultiUserMode) { |
| LOGERROR( |
| "Pool is in multiuser authentication mode. Get query service using " |
| "RegionService.getQueryService()"); |
| throw UnsupportedOperationException( |
| "Pool is in multiuser authentication mode. Get QueryService() using " |
| "RegionService.getQueryService()"); |
| } |
| |
| return getQueryServiceWithoutCheck(); |
| } |
| |
| QueryServicePtr ThinClientPoolDM::getQueryServiceWithoutCheck() { |
| if (!(m_remoteQueryServicePtr == NULLPTR)) { |
| return m_remoteQueryServicePtr; |
| } |
| SystemProperties* props = DistributedSystem::getSystemProperties(); |
| |
| if (props->isGridClient()) { |
| LOGWARN("Initializing query service while grid-client setting is enabled."); |
| // Init Query Service |
| m_remoteQueryServicePtr = |
| new RemoteQueryService(m_connManager.getCacheImpl(), this); |
| m_remoteQueryServicePtr->init(); |
| } else { |
| LOGWARN("Remote query service is not initialized."); |
| } |
| |
| return m_remoteQueryServicePtr; |
| } |
| void ThinClientPoolDM::sendUserCacheCloseMessage(bool keepAlive) { |
| LOGDEBUG("ThinClientPoolDM::sendUserCacheCloseMessage"); |
| UserAttributesPtr userAttribute = |
| TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes(); |
| |
| std::map<std::string, UserConnectionAttributes*>& uca = |
| userAttribute->getUserConnectionServers(); |
| |
| std::map<std::string, UserConnectionAttributes*>::iterator it; |
| |
| for (it = uca.begin(); it != uca.end(); it++) { |
| UserConnectionAttributes* uca = (*it).second; |
| if (uca->isAuthenticated() && uca->getEndpoint()->connected()) { |
| TcrMessageRemoveUserAuth request(keepAlive, this); |
| TcrMessageReply reply(true, this); |
| |
| sendRequestToEP(request, reply, uca->getEndpoint()); |
| |
| uca->setUnAuthenticated(); |
| } else { |
| uca->setUnAuthenticated(); |
| } |
| } |
| } |
| |
| TcrConnection* ThinClientPoolDM::getConnectionInMultiuserMode( |
| UserAttributesPtr userAttribute) { |
| LOGDEBUG("ThinClientPoolDM::getConnectionInMultiuserMode:"); |
| UserConnectionAttributes* uca = userAttribute->getConnectionAttribute(); |
| if (uca != NULL) { |
| TcrEndpoint* ep = uca->getEndpoint(); |
| LOGDEBUG( |
| "ThinClientPoolDM::getConnectionInMultiuserMode endpoint got = %s ", |
| ep->name().c_str()); |
| return getFromEP(ep); |
| } else { |
| return NULL; |
| } |
| } |
| |
| int32_t ThinClientPoolDM::GetPDXIdForType(SerializablePtr pdxType) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXIdForType:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageGetPdxIdForType request(pdxType, this); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to register PdxSerializable Type"); |
| } |
| |
| int32_t pdxTypeId = |
| static_cast<CacheableInt32*>(reply.getValue().ptr())->value(); |
| |
| // need to broadcast this id to all other pool |
| { |
| const HashMapOfPools& pools = PoolManager::getAll(); |
| |
| for (HashMapOfPools::Iterator iter = pools.begin(); iter != pools.end(); |
| ++iter) { |
| ThinClientPoolDM* currPool = |
| static_cast<ThinClientPoolDM*>(iter.second().ptr()); |
| |
| if (currPool != this) { |
| currPool->AddPdxType(pdxType, pdxTypeId); |
| } |
| } |
| } |
| |
| return pdxTypeId; |
| } |
| |
| void ThinClientPoolDM::AddPdxType(SerializablePtr pdxType, int32_t pdxTypeId) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXIdForType:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageAddPdxType request(pdxType, this, pdxTypeId); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to register PdxSerializable Type"); |
| } |
| } |
| |
| SerializablePtr ThinClientPoolDM::GetPDXTypeById(int32_t typeId) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXTypeById:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageGetPdxTypeById request(typeId, this); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::GetPDXTypeById: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to understand PdxSerializable Type"); |
| } |
| |
| return reply.getValue(); |
| } |
| |
| int32_t ThinClientPoolDM::GetEnumValue(SerializablePtr enumInfo) { |
| LOGDEBUG("ThinClientPoolDM::GetEnumValue:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageGetPdxIdForEnum request(enumInfo, this); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::GetEnumValue: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to register Pdx enum Type"); |
| } |
| |
| int32_t enumVal = |
| static_cast<CacheableInt32*>(reply.getValue().ptr())->value(); |
| |
| // need to broadcast this id to all other pool |
| { |
| const HashMapOfPools& pools = PoolManager::getAll(); |
| |
| for (HashMapOfPools::Iterator iter = pools.begin(); iter != pools.end(); |
| ++iter) { |
| ThinClientPoolDM* currPool = |
| static_cast<ThinClientPoolDM*>(iter.second().ptr()); |
| |
| if (currPool != this) { |
| currPool->AddEnum(enumInfo, enumVal); |
| } |
| } |
| } |
| |
| return enumVal; |
| } |
| |
| SerializablePtr ThinClientPoolDM::GetEnum(int32_t val) { |
| LOGDEBUG("ThinClientPoolDM::GetEnum:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageGetPdxEnumById request(val, this); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::GetEnum: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to understand enum Type"); |
| } |
| |
| return reply.getValue(); |
| } |
| |
| void ThinClientPoolDM::AddEnum(SerializablePtr enumInfo, int enumVal) { |
| LOGDEBUG("ThinClientPoolDM::AddEnum:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageAddPdxEnum request(enumInfo, this, enumVal); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = sendSyncRequest(request, reply); |
| |
| if (err != GF_NOERR) { |
| GfErrTypeToException("Operation Failed", err); |
| } else if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| LOGDEBUG("ThinClientPoolDM::AddEnum: Exception = %s ", |
| reply.getException()); |
| throw IllegalStateException("Failed to register enum Type"); |
| } |
| } |
| |
| GfErrType ThinClientPoolDM::sendUserCredentials(PropertiesPtr credentials, |
| TcrConnection*& conn, |
| bool isBGThread, |
| bool& isServerException) { |
| LOGDEBUG("ThinClientPoolDM::sendUserCredentials:"); |
| |
| GfErrType err = GF_NOERR; |
| |
| TcrMessageUserCredential request(credentials, this); |
| |
| TcrMessageReply reply(true, this); |
| |
| err = |
| conn->getEndpointObject()->sendRequestConnWithRetry(request, reply, conn); |
| |
| if (conn) err = handleEPError(conn->getEndpointObject(), reply, err); |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::sendUserCredentials: Error after sending cred request " |
| "= %d ", |
| err); |
| |
| if (err == GF_NOERR) { |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| // nothing to be done; |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| if (err == GF_NOERR) { |
| putInQueue( |
| conn, isBGThread); // connFound is only relevant for Sticky conn. |
| } |
| // this will set error type if there is some server exception |
| err = ThinClientRegion::handleServerException( |
| "ThinClientPoolDM::sendUserCredentials AuthException", |
| reply.getException()); |
| isServerException = true; |
| break; |
| } |
| default: { |
| if (err == GF_NOERR) { |
| putInQueue( |
| conn, isBGThread); // connFound is only relevant for Sticky conn. |
| } |
| LOGERROR( |
| "Unknown message type %d during secure response, possible " |
| "serialization mismatch", |
| reply.getMessageType()); |
| err = GF_MSG; |
| |
| break; |
| } |
| } |
| } |
| return err; |
| } |
| |
| TcrEndpoint* ThinClientPoolDM::getSingleHopServer( |
| TcrMessage& request, int8_t& version, |
| BucketServerLocationPtr& serverlocation, |
| std::set<ServerLocation>& excludeServers) { |
| const CacheableKeyPtr& key = request.getKeyRef(); |
| if (m_clientMetadataService == NULL || key == NULLPTR) return NULL; |
| RegionPtr region(request.getRegion()); |
| TcrEndpoint* ep = NULL; |
| if (region == NULLPTR) { |
| m_connManager.getCacheImpl()->getRegion(request.getRegionName().c_str(), |
| region); |
| } |
| if (region != NULLPTR) { |
| m_clientMetadataService->getBucketServerLocation( |
| region, key, request.getValueRef(), request.getCallbackArgumentRef(), |
| request.forPrimary(), serverlocation, version); |
| |
| if (serverlocation != NULLPTR && serverlocation->isValid()) { |
| LOGFINE("Server host and port are %s:%d", |
| serverlocation->getServerName().c_str(), |
| serverlocation->getPort()); |
| ep = getEndPoint(serverlocation, version, excludeServers); |
| } |
| } |
| return ep; |
| } |
| |
| TcrEndpoint* ThinClientPoolDM::getEndPoint( |
| const BucketServerLocationPtr& serverLocation, int8_t& version, |
| std::set<ServerLocation>& excludeServers) { |
| TcrEndpoint* ep = NULL; |
| if (serverLocation->isValid()) { |
| /*if (serverLocation->isPrimary()) { |
| version = serverLocation->getVersion(); |
| } else { |
| version = 0; |
| }*/ |
| |
| if (excludeServer(serverLocation->getEpString(), excludeServers)) { |
| LOGFINE("ThinClientPoolDM::getEndPoint Exclude Server true for %s ", |
| serverLocation->getEpString().c_str()); |
| return ep; |
| } |
| |
| // ACE_TCHAR serverLocn[256]; |
| // ACE_OS::snprintf(serverLocn, 256, "%s:%d", |
| // serverLocation->getServerName().c_str(), |
| // serverLocation->getPort()); |
| if (m_endpoints.find(serverLocation->getEpString(), ep) != -1) { |
| LOGDEBUG("Endpoint for single hop is %p", ep); |
| return ep; |
| } |
| |
| // do for pool with endpoints. Add endpoint into m_endpoints only when we |
| // did not find it above and it is in the pool's m_initServList. |
| for (std::vector<std::string>::iterator itr = |
| m_attrs->m_initServList.begin(); |
| itr != m_attrs->m_initServList.end(); ++itr) { |
| if ((ACE_OS::strcmp(serverLocation->getEpString().c_str(), |
| (*itr).c_str()) == 0)) { |
| ep = addEP(*(serverLocation.ptr())); // see if this is new endpoint |
| break; |
| } |
| } |
| |
| // do only for locator |
| // if servergroup is there, then verify otherwise you may reach to another |
| // group |
| if (m_attrs->m_initLocList.size()) { |
| std::string servGrp = this->getServerGroup(); |
| if (servGrp.length() > 0) { |
| CacheableStringArrayPtr groups = serverLocation->getServerGroups(); |
| if ((groups != NULLPTR) && (groups->length() > 0)) { |
| for (int i = 0; i < groups->length(); i++) { |
| CacheableStringPtr cs = groups[i]; |
| if (cs->length() > 0) { |
| std::string str = cs->toString(); |
| if ((ACE_OS::strcmp(str.c_str(), servGrp.c_str()) == 0)) { |
| ep = addEP( |
| *(serverLocation.ptr())); // see if this is new endpoint |
| break; |
| } |
| } |
| } |
| } |
| } else // just add it |
| { |
| ep = addEP(*(serverLocation.ptr())); // see if this is new endpoint |
| } |
| } |
| } |
| |
| return ep; |
| } |
| |
| // gets the endpoint from the list of endpoints using the endpoint Name |
| TcrEndpoint* ThinClientPoolDM::getEndPoint(std::string epNameStr) { |
| TcrEndpoint* ep = NULL; |
| if (m_endpoints.find(epNameStr, ep) != -1) { |
| LOGDEBUG("Endpoint for single hop is %p", ep); |
| return ep; |
| } |
| return ep; |
| } |
| |
| GfErrType ThinClientPoolDM::sendSyncRequest(TcrMessage& request, |
| TcrMessageReply& reply, |
| bool attemptFailover, |
| bool isBGThread) { |
| int32_t type = request.getMessageType(); |
| |
| if (!request.forTransaction() && m_attrs->getPRSingleHopEnabled() && |
| (type == TcrMessage::GET_ALL_70 || |
| type == TcrMessage::GET_ALL_WITH_CALLBACK) && |
| m_clientMetadataService != NULL) { |
| GfErrType error = GF_NOERR; |
| RegionPtr region; |
| m_connManager.getCacheImpl()->getRegion(request.getRegionName().c_str(), |
| region); |
| HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>* locationMap = |
| m_clientMetadataService->getServerToFilterMap(request.getKeys(), region, |
| request.forPrimary()); |
| |
| if (locationMap == NULL) { |
| request.InitializeGetallMsg( |
| request.getCallbackArgument()); // now initialize getall msg |
| return sendSyncRequest(request, reply, attemptFailover, isBGThread, |
| NULLPTR); |
| } |
| std::vector<GetAllWork*> getAllWorkers; |
| ThreadPool* threadPool = TPSingleton::instance(); |
| ChunkedGetAllResponse* responseHandler = |
| static_cast<ChunkedGetAllResponse*>(reply.getChunkedResultHandler()); |
| |
| for (HashMapT<BucketServerLocationPtr, VectorOfCacheableKeyPtr>::Iterator |
| locationIter = locationMap->begin(); |
| locationIter != locationMap->end(); locationIter++) { |
| BucketServerLocationPtr serverLocation = locationIter.first(); |
| if (serverLocation == NULLPTR) { |
| } |
| VectorOfCacheableKeyPtr keys = locationIter.second(); |
| GetAllWork* worker = |
| new GetAllWork(this, region, serverLocation, keys, attemptFailover, |
| isBGThread, responseHandler->getAddToLocalCache(), |
| responseHandler, request.getCallbackArgument()); |
| threadPool->perform(worker); |
| getAllWorkers.push_back(worker); |
| } |
| reply.setMessageType(TcrMessage::RESPONSE); |
| |
| for (std::vector<GetAllWork*>::iterator iter = getAllWorkers.begin(); |
| iter != getAllWorkers.end(); iter++) { |
| GetAllWork* worker = *iter; |
| GfErrType err = worker->getResult(); |
| |
| if (err != GF_NOERR) { |
| error = err; |
| } |
| |
| TcrMessage* currentReply = worker->getReply(); |
| if (currentReply->getMessageType() != TcrMessage::RESPONSE) { |
| reply.setMessageType(currentReply->getMessageType()); |
| } |
| // ChunkedGetAllResponsePtr |
| // currentResponseHandler(currentReply->getChunkedResultHandler()); |
| // responseHandler->add(currentResponseHandler.ptr()); |
| |
| delete worker; |
| } |
| delete locationMap; |
| return error; |
| } else { |
| if (type == TcrMessage::GET_ALL_70 || |
| type == TcrMessage::GET_ALL_WITH_CALLBACK) { |
| request.InitializeGetallMsg( |
| request.getCallbackArgument()); // now initialize getall msg |
| } |
| return sendSyncRequest(request, reply, attemptFailover, isBGThread, |
| NULLPTR); |
| } |
| } |
| |
| GfErrType ThinClientPoolDM::sendSyncRequest( |
| TcrMessage& request, TcrMessageReply& reply, bool attemptFailover, |
| bool isBGThread, const BucketServerLocationPtr& serverLocation) { |
| LOGDEBUG("ThinClientPoolDM::sendSyncRequest: ....%d %s", |
| request.getMessageType(), m_poolName.c_str()); |
| // Increment clientOps |
| HostAsm::atomicAdd(m_clientOps, 1); |
| getStats().setCurClientOps(m_clientOps); |
| |
| /*get the ClientOp start time*/ |
| // int64 sampleStartNanos =Utils::startStatOpTime(); |
| GfErrType error = GF_NOTCON; |
| |
| UserAttributesPtr userAttr = NULLPTR; |
| reply.setDM(this); |
| |
| int32_t type = request.getMessageType(); |
| |
| if (!(type == TcrMessage::QUERY || |
| type == TcrMessage::QUERY_WITH_PARAMETERS || |
| type == TcrMessage::PUTALL || |
| type == TcrMessage::PUT_ALL_WITH_CALLBACK || |
| type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) { |
| // set only when message is not query, putall and executeCQ |
| reply.setTimeout(this->getReadTimeout() / 1000); |
| request.setTimeout(this->getReadTimeout() / 1000); |
| } |
| |
| bool retryAllEPsOnce = false; |
| if (m_attrs->getRetryAttempts() == -1) { |
| retryAllEPsOnce = true; |
| } |
| long retry = m_attrs->getRetryAttempts() + 1; |
| TcrConnection* conn = NULL; |
| std::set<ServerLocation> excludeServers; |
| type = request.getMessageType(); |
| bool isAuthRequireExcep = false; |
| int isAuthRequireExcepMaxTry = 2; |
| bool firstTry = true; |
| LOGFINE("sendSyncRequest:: retry = %d", retry); |
| while (retryAllEPsOnce || retry-- || |
| (isAuthRequireExcep && isAuthRequireExcepMaxTry >= 0)) { |
| isAuthRequireExcep = false; |
| if (!firstTry) request.updateHeaderForRetry(); |
| // if it's a query or putall and we had a timeout, just return with the |
| // newly |
| // selected endpoint without failover-retry |
| if ((type == TcrMessage::QUERY || |
| type == TcrMessage::QUERY_WITH_PARAMETERS || |
| type == TcrMessage::PUTALL || |
| type == TcrMessage::PUT_ALL_WITH_CALLBACK || |
| type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) && |
| error == GF_TIMOUT) { |
| return error; |
| } |
| |
| GfErrType queueErr = GF_NOERR; |
| uint32_t lastExcludeSize = static_cast<uint32_t>(excludeServers.size()); |
| int8_t version = 0; |
| |
| bool isUserNeedToReAuthenticate = false; |
| bool singleHopConnFound = false; |
| bool connFound = false; |
| if (!this->m_isMultiUserMode || |
| (!TcrMessage::isUserInitiativeOps(request))) { |
| conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread, |
| request, version, singleHopConnFound, |
| connFound, serverLocation); |
| } else { |
| userAttr = TSSUserAttributesWrapper::s_geodeTSSUserAttributes |
| ->getUserAttributes(); |
| if (userAttr == NULLPTR) { |
| LOGWARN("Attempted operation type %d without credentials", |
| request.getMessageType()); |
| return GF_NOT_AUTHORIZED_EXCEPTION; |
| } |
| // Can i assume here that we will always get connection here |
| conn = getConnectionFromQueueW(&queueErr, excludeServers, isBGThread, |
| request, version, singleHopConnFound, |
| connFound, serverLocation); |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::sendSyncRequest: after " |
| "getConnectionInMultiuserMode %d", |
| isUserNeedToReAuthenticate); |
| if (conn != NULL) { // need to chk whether user is already authenticated |
| // to this endpoint or not. |
| isUserNeedToReAuthenticate = |
| !(userAttr->isEndpointAuthenticated(conn->getEndpointObject())); |
| } |
| } |
| |
| if (queueErr == GF_CLIENT_WAIT_TIMEOUT) { |
| LOGFINE("Request timeout at client only"); |
| return GF_CLIENT_WAIT_TIMEOUT; |
| } else if (queueErr == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) { |
| // need to refresh meta data |
| RegionPtr region; |
| m_connManager.getCacheImpl()->getRegion(request.getRegionName().c_str(), |
| region); |
| if (region != NULLPTR) { |
| LOGFINE( |
| "Need to refresh pr-meta-data timeout in client only with refresh " |
| "metadata"); |
| ThinClientRegion* tcrRegion = |
| dynamic_cast<ThinClientRegion*>(region.ptr()); |
| tcrRegion->setMetaDataRefreshed(false); |
| m_clientMetadataService->enqueueForMetadataRefresh( |
| region->getFullPath(), reply.getserverGroupVersion()); |
| } |
| return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; |
| } |
| |
| LOGDEBUG( |
| "ThinClientPoolDM::sendSyncRequest: isUserNeedToReAuthenticate = %d ", |
| isUserNeedToReAuthenticate); |
| LOGDEBUG( |
| "ThinClientPoolDM::sendSyncRequest: m_isMultiUserMode = %d conn = %d " |
| "type = %d", |
| m_isMultiUserMode, conn, type); |
| |
| if (!conn) { |
| // lets assume all connection are in use will happen |
| if (queueErr == GF_NOERR) { |
| queueErr = GF_ALL_CONNECTIONS_IN_USE_EXCEPTION; |
| HostAsm::atomicAdd(m_clientOps, -1); |
| getStats().setCurClientOps(m_clientOps); |
| getStats().incFailedClientOps(); |
| return queueErr; |
| } else if (queueErr == GF_IOERR) { |
| error = GF_NOTCON; |
| } else { |
| error = queueErr; |
| } |
| } |
| if (conn) { |
| TcrEndpoint* ep = conn->getEndpointObject(); |
| LOGDEBUG( |
| "ThinClientPoolDM::sendSyncRequest: sendSyncReq " |
| "ep->isAuthenticated() = %d ", |
| ep->isAuthenticated()); |
| GfErrType userCredMsgErr = GF_NOERR; |
| bool isServerException = false; |
| if (TcrMessage::isUserInitiativeOps(request) && |
| (this->m_isSecurityOn || this->m_isMultiUserMode)) { |
| if (!this->m_isMultiUserMode && !ep->isAuthenticated()) { |
| // first authenticate him on this endpoint |
| userCredMsgErr = this->sendUserCredentials( |
| this->getCredentials(ep), conn, isBGThread, isServerException); |
| } else if (isUserNeedToReAuthenticate) { |
| userCredMsgErr = this->sendUserCredentials( |
| userAttr->getCredentials(), conn, isBGThread, isServerException); |
| } |
| } |
| |
| if (userCredMsgErr == GF_NOERR) { |
| error = ep->sendRequestConnWithRetry(request, reply, conn); |
| error = handleEPError(ep, reply, error); |
| } else { |
| error = userCredMsgErr; |
| } |
| |
| if (!isServerException) { |
| if (error == GF_NOERR) { |
| // afterSendingRequest(request.getMessageType( ),reply, conn); |
| LOGDEBUG("putting connection back in queue"); |
| putInQueue(conn, |
| isBGThread || |
| request.getMessageType() == TcrMessage::GET_ALL_70 || |
| request.getMessageType() == |
| TcrMessage::GET_ALL_WITH_CALLBACK || |
| request.getMessageType() == |
| TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP, |
| request.forTransaction()); // connFound is only relevant |
| // for Sticky conn. |
| LOGDEBUG("putting connection back in queue DONE"); |
| // LOGFINE("putting connection back in queue"); |
| } else { |
| if (error != GF_TIMOUT) removeEPConnections(ep); |
| // Update stats for the connection that failed. |
| removeEPConnections(1, false); |
| setStickyNull(isBGThread || |
| request.getMessageType() == TcrMessage::GET_ALL_70 || |
| request.getMessageType() == |
| TcrMessage::GET_ALL_WITH_CALLBACK || |
| request.getMessageType() == |
| TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP); |
| if (conn) { |
| GF_SAFE_DELETE_CON(conn) |
| } |
| excludeServers.insert(ServerLocation(ep->name())); |
| } |
| } else { |
| return error; // server exception while sending credentail message to |
| } |
| // server... |
| } |
| |
| if (error == GF_NOERR) { |
| if ((this->m_isSecurityOn || this->m_isMultiUserMode)) { |
| if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| if (isAuthRequireException(reply.getException())) { |
| TcrEndpoint* ep = conn->getEndpointObject(); |
| if (!this->m_isMultiUserMode) { |
| ep->setAuthenticated(false); |
| } else if (userAttr != NULLPTR) { |
| userAttr->unAuthenticateEP(ep); |
| } |
| LOGFINEST( |
| "After getting AuthenticationRequiredException trying again."); |
| isAuthRequireExcepMaxTry--; |
| isAuthRequireExcep = true; |
| continue; |
| } else if (isNotAuthorizedException(reply.getException())) { |
| LOGDEBUG("received NotAuthorizedException"); |
| // TODO should we try again? |
| } |
| } |
| } |
| LOGFINER( |
| "reply Metadata version is %d & bsl version is %d " |
| "reply.isFEAnotherHop()=%d", |
| reply.getMetaDataVersion(), version, reply.isFEAnotherHop()); |
| // LOGINFO("reply Metadata version is %d & bsl version is %d |
| // reply.isFEAnotherHop()=%d connFound= %d", reply.getMetaDataVersion(), |
| // version, reply.isFEAnotherHop(), connFound); |
| /*if(reply.getMetaDataVersion() > 0 ) { |
| LOGCONFIG("reply Metadata version is %d & bsl version is %d", |
| reply.getMetaDataVersion(), version); |
| }*/ |
| if (m_clientMetadataService != NULL && request.forSingleHop() && |
| (reply.getMetaDataVersion() != 0 || |
| (request.getMessageType() == TcrMessage::EXECUTE_REGION_FUNCTION && |
| request.getKeyRef() != NULLPTR && reply.isFEAnotherHop()))) { |
| // Need to get direct access to Region's name to avoid referencing |
| // temp data and causing crashes |
| RegionPtr region; |
| m_connManager.getCacheImpl()->getRegion(request.getRegionName().c_str(), |
| region); |
| if (region != NULLPTR) { |
| if (!connFound) // max limit case then don't refresh otherwise always |
| // refresh |
| { |
| LOGFINE("Need to refresh pr-meta-data"); |
| ThinClientRegion* tcrRegion = |
| dynamic_cast<ThinClientRegion*>(region.ptr()); |
| tcrRegion->setMetaDataRefreshed(false); |
| } |
| m_clientMetadataService->enqueueForMetadataRefresh( |
| region->getFullPath(), reply.getserverGroupVersion()); |
| } |
| } |
| } |
| |
| if (excludeServers.size() == lastExcludeSize) { |
| excludeServers.clear(); |
| if (retryAllEPsOnce) { |
| break; |
| } |
| } |
| |
| if (!attemptFailover || error == GF_NOERR) { |
| HostAsm::atomicAdd(m_clientOps, -1); |
| getStats().setCurClientOps(m_clientOps); |
| if (error == GF_NOERR) { |
| getStats().incSucceedClientOps(); /*inc Id for clientOs stat*/ |
| } else if (error == GF_TIMOUT) { |
| getStats().incTimeoutClientOps(); |
| } else { |
| getStats().incFailedClientOps(); |
| } |
| // Top-level only sees NotConnectedException |
| if (error == GF_IOERR) { |
| error = GF_NOTCON; |
| } |
| /*Update the time stat for clientOpsTime */ |
| // Utils::updateStatOpTime(m_stats->getStats(), |
| // m_poolStatType->getInstance()->getClientOpsSucceededTimeId(), |
| // sampleStartNanos); |
| return error; |
| } |
| |
| conn = NULL; |
| firstTry = false; |
| } // While |
| |
| HostAsm::atomicAdd(m_clientOps, -1); |
| getStats().setCurClientOps(m_clientOps); |
| |
| /*Update the time stat for clientOpsTime */ |
| // Utils::updateStatOpTime(m_stats->getStats(), |
| // m_poolStatType->getInstance()->getClientOpsSucceededTimeId(), |
| // sampleStartNanos); |
| |
| if (error == GF_NOERR) { |
| getStats().incSucceedClientOps(); |
| } else if (error == GF_TIMOUT) { |
| getStats().incTimeoutClientOps(); |
| } else { |
| getStats().incFailedClientOps(); |
| } |
| |
| // Top-level only sees NotConnectedException |
| if (error == GF_IOERR) { |
| error = GF_NOTCON; |
| } |
| return error; |
| } |
| |
| // void ThinClientPoolDM::updateQueue(const char* regionPath) { |
| // m_clientMetadataService->enqueueForMetadataRefresh(regionPath); |
| //} |
| |
| void ThinClientPoolDM::removeEPConnections(int numConn, |
| bool triggerManageConn) { |
| // TODO: Delete EP |
| |
| // HostAsm::atomicAdd( m_poolSize, -1 * numConn ); |
| reducePoolSize(numConn); |
| // Stats |
| // getStats().incPoolDisconnects( numConn ); |
| |
| // getStats().setCurPoolConnections(m_poolSize); |
| |
| // Raise Semaphore for manage thread |
| if (triggerManageConn) { |
| m_connSema.release(); |
| } |
| } |
| |
| // Tries to get connection to a endpoint. If no connection is available, it |
| // tries |
| // to create one. If it fails to create one, it gets connection to any other |
| // server |
| // and fails over the transaction to that server. |
| // This function is used when the transaction is to be resumed to a specified |
| // server. |
| GfErrType ThinClientPoolDM::getConnectionToAnEndPoint(std::string epNameStr, |
| TcrConnection*& conn) { |
| conn = NULL; |
| |
| GfErrType error = GF_NOERR; |
| TcrEndpoint* theEP = getEndPoint(epNameStr); |
| |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ): Getting endpoint object " |
| "for %s", |
| epNameStr.c_str()); |
| if (theEP != NULL && theEP->connected()) { |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ): Getting connection " |
| "for endpoint %s", |
| epNameStr.c_str()); |
| conn = getFromEP(theEP); |
| // if connection is null, possibly because there are no idle connections |
| // to this endpoint, create a new pool connection to this endpoint. |
| bool maxConnLimit = false; |
| if (conn == NULL) { |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ): Create connection " |
| "for endpoint %s", |
| epNameStr.c_str()); |
| error = createPoolConnectionToAEndPoint(conn, theEP, maxConnLimit); |
| } |
| } |
| |
| // if connection is null, it has failed to get a connection to the specified |
| // endpoint. Get a connection to any other server and failover the transaction |
| // to that server. |
| if (conn == NULL) { |
| std::set<ServerLocation> excludeServers; |
| bool maxConnLimit = false; |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ): No connection " |
| "available for endpoint %s. Create connection to any endpoint.", |
| epNameStr.c_str()); |
| conn = getConnectionFromQueue(true, &error, excludeServers, maxConnLimit); |
| /* adongre |
| * CID 28680: Dereference after null check (FORWARD_NULL)Comparing "conn" to |
| * null implies that "conn" might be null. |
| * Passing null variable "conn" to function |
| * "apache::geode::client::TcrConnection::getEndpointObject() const", which |
| * dereferences |
| * it. |
| */ |
| // if(conn != NULL || error == GF_NOERR) |
| if (conn != NULL && error == GF_NOERR) { |
| if (conn->getEndpointObject()->name() != epNameStr) { |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ): Endpoint %s " |
| "different than the endpoint %s. New connection created and " |
| "failing over.", |
| epNameStr.c_str(), conn->getEndpointObject()->name().c_str()); |
| GfErrType failoverErr = doFailover(conn); |
| if (failoverErr != GF_NOERR) { |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAnEndPoint( ):Failed to " |
| "failover transaction to another server. From endpoint %s to %s", |
| epNameStr.c_str(), conn->getEndpointObject()->name().c_str()); |
| putInQueue(conn, false); |
| conn = NULL; |
| } |
| } |
| } |
| } |
| |
| if (conn == NULL || error != GF_NOERR) { |
| LOGFINE( |
| "ThinClientPoolDM::getConnectionToAEndPoint( ):Failed to connect to %s", |
| theEP->name().c_str()); |
| if (conn != NULL) GF_SAFE_DELETE(conn); |
| } |
| // else //no need of this, will do this in StickyMgr while addding |
| // conn->setAndGetBeingUsed( true ); |
| |
| return error; |
| } |
| |
| // Create a pool connection to specified endpoint. First checks if the number of |
| // connections has exceeded the maximum allowed. If not, create a connection to |
| // the specified endpoint. Else, throws an error. |
| GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint( |
| TcrConnection*& conn, TcrEndpoint* theEP, bool& maxConnLimit, |
| bool appThreadrequest) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| GfErrType error = GF_NOERR; |
| conn = NULL; |
| int min = 0; |
| { |
| // ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| // Check if the pool size has exceeded maximum allowed. |
| |
| int max = m_attrs->getMaxConnections(); |
| if (max == -1) { |
| max = 0x7fffffff; |
| } |
| min = m_attrs->getMinConnections(); |
| max = max > min ? max : min; |
| |
| if (m_poolSize >= max) { |
| maxConnLimit = true; |
| LOGFINER( |
| "ThinClientPoolDM::createPoolConnectionToAEndPoint( ): current pool " |
| "size has reached limit %d, %d", |
| m_poolSize, max); |
| return error; |
| } |
| } |
| |
| LOGFINE( |
| "ThinClientPoolDM::createPoolConnectionToAEndPoint( ): creating a new " |
| "connection to the endpoint %s", |
| theEP->name().c_str()); |
| // if the pool size is within limits, create a new connection. |
| error = theEP->createNewConnection( |
| conn, false, false, |
| DistributedSystem::getSystemProperties()->connectTimeout(), false, true, |
| appThreadrequest); |
| if (conn == NULL || error != GF_NOERR) { |
| LOGFINE("2Failed to connect to %s", theEP->name().c_str()); |
| if (conn != NULL) GF_SAFE_DELETE(conn); |
| } else { |
| theEP->setConnected(); |
| ++m_poolSize; |
| if (m_poolSize > min) { |
| getStats().incLoadCondConnects(); |
| } |
| // Update Stats |
| getStats().incPoolConnects(); |
| getStats().setCurPoolConnections(m_poolSize); |
| } |
| m_connSema.release(); |
| |
| return error; |
| } |
| |
| void ThinClientPoolDM::reducePoolSize(int num) { |
| LOGFINE("removing connection %d , pool-size =%d", num, m_poolSize); |
| HostAsm::atomicAdd(m_poolSize, -1 * num); |
| if (m_poolSize == 0) { |
| if (m_cliCallbackTask != NULL) m_cliCallbackSema.release(); |
| } |
| } |
| GfErrType ThinClientPoolDM::createPoolConnection( |
| TcrConnection*& conn, std::set<ServerLocation>& excludeServers, |
| bool& maxConnLimit, const TcrConnection* currentserver) |
| // GfErrType ThinClientPoolDM::createPoolConnection( TcrConnection*& conn, |
| // std::set< ServerLocation >& excludeServers, const TcrConnection* |
| // currentserver) |
| { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| GfErrType error = GF_NOERR; |
| int max = m_attrs->getMaxConnections(); |
| if (max == -1) { |
| max = 0x7fffffff; |
| } |
| int min = m_attrs->getMinConnections(); |
| max = max > min ? max : min; |
| LOGDEBUG( |
| "ThinClientPoolDM::createPoolConnection( ): current pool size has " |
| "reached limit %d, %d, %d", |
| m_poolSize, max, min); |
| // LOGINFO("max: %d, min: %d, poolSize: %d", max, min, m_poolSize); |
| |
| conn = NULL; |
| { |
| // ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| if (m_poolSize >= max) { |
| LOGDEBUG( |
| "ThinClientPoolDM::createPoolConnection( ): current pool size has " |
| "reached limit %d, %d", |
| m_poolSize, max); |
| maxConnLimit = true; |
| return error; |
| } |
| // m_poolSize++; |
| } |
| |
| bool fatal = false; |
| GfErrType fatalError = GF_NOERR; |
| // LOGINFO("serverlist: %d, locList: %d", m_attrs->m_initServList.size(), |
| // m_attrs->m_initLocList.size()); |
| |
| while (true) { |
| std::string epNameStr; |
| try { |
| epNameStr = selectEndpoint(excludeServers, currentserver); |
| } catch (const NoAvailableLocatorsException&) { |
| /*{//increase up so reduce it |
| ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| reducePoolSize(1); |
| }*/ |
| LOGFINE("Locator query failed"); |
| return GF_CACHE_LOCATOR_EXCEPTION; |
| } catch (const Exception&) { |
| /*{//increase up so reduce it |
| ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| reducePoolSize(1); |
| }*/ |
| LOGFINE("Endpoint selection failed"); |
| return GF_NOTCON; |
| } |
| LOGFINE("Connecting to %s", epNameStr.c_str()); |
| TcrEndpoint* ep = NULL; |
| ep = addEP(epNameStr.c_str()); |
| |
| if (currentserver != NULL && |
| epNameStr == currentserver->getEndpointObject()->name()) { |
| LOGDEBUG("Updating existing connection: ", epNameStr.c_str()); |
| conn = const_cast<TcrConnection*>(currentserver); |
| conn->updateCreationTime(); |
| /*{ |
| ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| reducePoolSize(1);;//we have not created new connection |
| }*/ |
| break; |
| } else { |
| error = ep->createNewConnection( |
| conn, false, false, |
| DistributedSystem::getSystemProperties()->connectTimeout(), false); |
| } |
| |
| if (conn == NULL || error != GF_NOERR) { |
| LOGFINE("1Failed to connect to %s", epNameStr.c_str()); |
| /*{ |
| ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_queueLock ); |
| reducePoolSize(1); |
| }*/ |
| excludeServers.insert(ServerLocation(ep->name())); |
| if (conn != NULL) { |
| GF_SAFE_DELETE(conn); |
| } |
| if (ThinClientBaseDM::isFatalError(error)) { |
| // save this error for later to override the |
| // error code to be returned |
| fatalError = error; |
| fatal = true; |
| } |
| if (ThinClientBaseDM::isFatalClientError(error)) { |
| // log the error string instead of error number. |
| LOGFINE("Connection failed due to fatal client error %d", error); |
| return error; |
| } |
| } else { |
| ep->setConnected(); |
| ++m_poolSize; // already increased |
| if (m_poolSize > min) { |
| getStats().incLoadCondConnects(); |
| } |
| // Update Stats |
| getStats().incPoolConnects(); |
| getStats().setCurPoolConnections(m_poolSize); |
| break; |
| } |
| } |
| m_connSema.release(); |
| // if a fatal error occurred earlier and we don't have |
| // a connection then return this saved error |
| if (fatal && !conn && error != GF_NOERR) { |
| return fatalError; |
| } |
| |
| return error; |
| } |
| |
| TcrConnection* ThinClientPoolDM::getConnectionFromQueue( |
| bool timeout, GfErrType* error, std::set<ServerLocation>& excludeServers, |
| bool& maxConnLimit) { |
| int64_t timeoutTime = m_attrs->getFreeConnectionTimeout() / |
| 1000; // in millisec so divide by 1000 |
| |
| getStats().setCurWaitingConnections(waiters()); |
| getStats().incWaitingConnections(); |
| |
| /*get the start time for connectionWaitTime stat*/ |
| int64 sampleStartNanos = Utils::startStatOpTime(); |
| TcrConnection* mp = |
| getUntil(timeoutTime, error, excludeServers, maxConnLimit); |
| /*Update the time stat for clientOpsTime */ |
| Utils::updateStatOpTime( |
| getStats().getStats(), |
| PoolStatType::getInstance()->getTotalWaitingConnTimeId(), |
| sampleStartNanos); |
| return mp; |
| } |
| |
| bool ThinClientPoolDM::isEndpointAttached(TcrEndpoint* ep) { return true; } |
| |
| GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request, |
| TcrMessageReply& reply, |
| TcrEndpoint* currentEndpoint) { |
| LOGDEBUG("ThinClientPoolDM::sendRequestToEP()"); |
| int isAuthRequireExcepMaxTry = 2; |
| bool isAuthRequireExcep = true; |
| GfErrType error = GF_NOERR; |
| while (isAuthRequireExcep && isAuthRequireExcepMaxTry >= 0) { |
| isAuthRequireExcep = false; |
| TcrConnection* conn = getFromEP(currentEndpoint); |
| |
| bool isTmpConnectedStatus = false; |
| bool putConnInPool = true; |
| if (conn == NULL) { |
| LOGDEBUG( |
| "ThinClientPoolDM::sendRequestToEP(): got NULL connection from pool, " |
| "creating new connection in the pool."); |
| bool maxConnLimit = false; |
| error = |
| createPoolConnectionToAEndPoint(conn, currentEndpoint, maxConnLimit); |
| if (conn == NULL || error != GF_NOERR) { |
| LOGDEBUG( |
| "ThinClientPoolDM::sendRequestToEP(): couldnt create a pool " |
| "connection, creating a temporary connection."); |
| error = currentEndpoint->createNewConnection( |
| conn, false, false, |
| DistributedSystem::getSystemProperties()->connectTimeout(), false); |
| putConnInPool = false; |
| currentEndpoint->setConnectionStatus(true); |
| } |
| } |
| |
| if (conn == NULL || error != GF_NOERR) { |
| LOGFINE("3Failed to connect to %s", currentEndpoint->name().c_str()); |
| if (conn != NULL) { |
| GF_SAFE_DELETE(conn); |
| } |
| if (putConnInPool) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(getPoolLock()); |
| reducePoolSize(1); |
| } |
| currentEndpoint->setConnectionStatus(false); |
| return error; |
| } else if (!putConnInPool && !currentEndpoint->connected()) { |
| isTmpConnectedStatus = true; |
| currentEndpoint->setConnectionStatus(true); |
| } |
| |
| int32_t type = request.getMessageType(); |
| |
| // reply.setTimeout( this->getReadTimeout()/1000 ); |
| if (!(type == TcrMessage::QUERY || type == TcrMessage::PUTALL || |
| type == TcrMessage::PUT_ALL_WITH_CALLBACK || |
| type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE)) { |
| reply.setTimeout(this->getReadTimeout() / 1000); |
| } |
| |
| reply.setDM(this); |
| UserAttributesPtr ua = NULLPTR; |
| // in multi user mode need to chk whether user is authenticated or not |
| // and then follow usual process which we did in send syncrequest. |
| // need to user initiative ops |
| LOGDEBUG("ThinClientPoolDM::sendRequestToEP: this->m_isMultiUserMode = %d", |
| this->m_isMultiUserMode); |
| bool isServerException = false; |
| // if( !(TcrMessage::USER_CREDENTIAL_MESSAGE == request.getMessageType())){ |
| if (TcrMessage::isUserInitiativeOps((request)) && |
| (this->m_isSecurityOn || this->m_isMultiUserMode)) { |
| if (!this->m_isMultiUserMode && !currentEndpoint->isAuthenticated()) { |
| // first authenticate him on this endpoint |
| error = this->sendUserCredentials(this->getCredentials(currentEndpoint), |
| conn, false, isServerException); |
| } else if (this->m_isMultiUserMode) { |
| ua = TSSUserAttributesWrapper::s_geodeTSSUserAttributes |
| ->getUserAttributes(); |
| if (ua == NULLPTR) { |
| LOGWARN("Attempted operation type %d without credentials", |
| request.getMessageType()); |
| if (conn != NULL) putInQueue(conn, false, request.forTransaction()); |
| // GfErrTypeToException("Found operation without credential", |
| // GF_NOT_AUTHORIZED_EXCEPTION); |
| return GF_NOT_AUTHORIZED_EXCEPTION; |
| } else { |
| UserConnectionAttributes* uca = |
| ua->getConnectionAttribute(currentEndpoint); |
| |
| if (uca == NULL) { |
| error = this->sendUserCredentials(ua->getCredentials(), conn, false, |
| isServerException); |
| } |
| } |
| } |
| } |
| //} |
| |
| LOGDEBUG("ThinClientPoolDM::sendRequestToEP after getting creds"); |
| if (error == GF_NOERR && conn != NULL) { |
| error = |
| currentEndpoint->sendRequestConnWithRetry(request, reply, conn, true); |
| } |
| |
| if (isServerException) return error; |
| |
| if (error == GF_NOERR) { |
| int32_t replyMsgType = reply.getMessageType(); |
| if (replyMsgType == TcrMessage::EXCEPTION || |
| replyMsgType == TcrMessage::CQ_EXCEPTION_TYPE || |
| replyMsgType == TcrMessage::CQDATAERROR_MSG_TYPE) { |
| error = ThinClientRegion::handleServerException( |
| "ThinClientPoolDM::sendRequestToEP", reply.getException()); |
| } |
| |
| if (putConnInPool) { |
| put(conn, false); |
| } else { |
| if (isTmpConnectedStatus) currentEndpoint->setConnectionStatus(false); |
| conn->close(); |
| GF_SAFE_DELETE(conn); |
| } |
| } else if (error != GF_NOERR) { |
| currentEndpoint->setConnectionStatus(false); |
| if (putConnInPool) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(getPoolLock()); |
| removeEPConnections(1); |
| } |
| } |
| |
| if (error == GF_NOERR || error == GF_CACHESERVER_EXCEPTION || |
| error == GF_AUTHENTICATION_REQUIRED_EXCEPTION) { |
| if ((this->m_isSecurityOn || this->m_isMultiUserMode)) { |
| if (reply.getMessageType() == TcrMessage::EXCEPTION) { |
| if (isAuthRequireException(reply.getException())) { |
| if (!this->m_isMultiUserMode) { |
| currentEndpoint->setAuthenticated(false); |
| } else if (ua != NULLPTR) { |
| ua->unAuthenticateEP(currentEndpoint); |
| } |
| LOGFINEST( |
| "After getting AuthenticationRequiredException trying again."); |
| isAuthRequireExcepMaxTry--; |
| isAuthRequireExcep = true; |
| if (isAuthRequireExcepMaxTry >= 0) error = GF_NOERR; |
| continue; |
| } |
| } |
| } |
| } |
| } |
| LOGDEBUG("ThinClientPoolDM::sendRequestToEP Done."); |
| return error; |
| } |
| |
| TcrEndpoint* ThinClientPoolDM::addEP(ServerLocation& serverLoc) { |
| std::string serverName = serverLoc.getServerName(); |
| int port = serverLoc.getPort(); |
| char endpointName[100]; |
| ACE_OS::snprintf(endpointName, 100, "%s:%d", serverName.c_str(), port); |
| |
| return addEP(endpointName); |
| } |
| |
| TcrEndpoint* ThinClientPoolDM::addEP(const char* endpointName) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointsLock); |
| TcrEndpoint* ep = NULL; |
| |
| // std::string fullName = Utils::convertHostToCanonicalForm(endpointName ); |
| std::string fullName = endpointName; |
| if (m_endpoints.find(fullName, ep)) { |
| LOGFINE("Created new endpoint %s for pool %s", fullName.c_str(), |
| m_poolName.c_str()); |
| ep = createEP(fullName.c_str()); |
| if (m_endpoints.bind(fullName, ep)) { |
| LOGERROR("Failed to add endpoint %s to pool %s", fullName.c_str(), |
| m_poolName.c_str()); |
| GF_DEV_ASSERT( |
| "ThinClientPoolDM::addEP( ): failed to add endpoint" ? false : false); |
| } |
| } |
| // Update Server Stats |
| getStats().setServers(static_cast<int32_t>(m_endpoints.current_size())); |
| return ep; |
| } |
| |
| void ThinClientPoolDM::netDown() { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(getPoolLock()); |
| close(); |
| reset(); |
| } |
| |
| void ThinClientPoolDM::pingServerLocal() { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(getPoolLock()); |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointsLock); |
| for (ACE_Map_Manager<std::string, TcrEndpoint*, |
| ACE_Recursive_Thread_Mutex>::iterator it = |
| m_endpoints.begin(); |
| it != m_endpoints.end(); it++) { |
| if ((*it).int_id_->connected()) { |
| (*it).int_id_->pingServer(this); |
| if (!(*it).int_id_->connected()) { |
| removeEPConnections((*it).int_id_); |
| removeCallbackConnection((*it).int_id_); |
| } |
| } |
| } |
| } |
| |
| int ThinClientPoolDM::updateLocatorList(volatile bool& isRunning) { |
| LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str()); |
| while (isRunning) { |
| m_updateLocatorListSema.acquire(); |
| if (isRunning && !TcrConnectionManager::isNetDown) { |
| ((ThinClientLocatorHelper*)m_locHelper) |
| ->updateLocators(this->getServerGroup()); |
| } |
| } |
| LOGFINE("Ending updateLocatorList thread for pool %s", m_poolName.c_str()); |
| return 0; |
| } |
| |
| int ThinClientPoolDM::pingServer(volatile bool& isRunning) { |
| LOGFINE("Starting ping thread for pool %s", m_poolName.c_str()); |
| while (isRunning) { |
| m_pingSema.acquire(); |
| if (isRunning && !TcrConnectionManager::isNetDown) { |
| pingServerLocal(); |
| while (m_pingSema.tryacquire() != -1) { |
| ; |
| } |
| } |
| } |
| LOGFINE("Ending ping thread for pool %s", m_poolName.c_str()); |
| return 0; |
| } |
| |
| int ThinClientPoolDM::cliCallback(volatile bool& isRunning) { |
| LOGFINE("Starting cliCallback thread for pool %s", m_poolName.c_str()); |
| while (isRunning) { |
| m_cliCallbackSema.acquire(); |
| if (isRunning) { |
| LOGFINE("Clearing Pdx Type Registry"); |
| // this call for csharp client |
| DistributedSystemImpl::CallCliCallBack(); |
| // this call for cpp client |
| PdxTypeRegistry::clear(); |
| while (m_cliCallbackSema.tryacquire() != -1) { |
| ; |
| } |
| } |
| } |
| LOGFINE("Ending cliCallback thread for pool %s", m_poolName.c_str()); |
| return 0; |
| } |
| |
| int ThinClientPoolDM::doPing(const ACE_Time_Value&, const void*) { |
| m_pingSema.release(); |
| return 0; |
| } |
| |
| int ThinClientPoolDM::doUpdateLocatorList(const ACE_Time_Value&, const void*) { |
| m_updateLocatorListSema.release(); |
| return 0; |
| } |
| |
| int ThinClientPoolDM::doManageConnections(const ACE_Time_Value&, const void*) { |
| m_connSema.release(); |
| return 0; |
| } |
| |
| void ThinClientPoolDM::releaseThreadLocalConnection() { |
| m_manager->releaseThreadLocalConnection(); |
| } |
| void ThinClientPoolDM::setThreadLocalConnection(TcrConnection* conn) { |
| m_manager->addStickyConnection(conn); |
| } |
| |
| bool ThinClientPoolDM::hasExpired(TcrConnection* conn) { |
| int load = getLoadConditioningInterval(); |
| int idle = getIdleTimeout(); |
| |
| if (load != -1) { |
| if (load < idle || idle == -1) { |
| idle = load; |
| } |
| } |
| |
| return conn->hasExpired(load); |
| } |
| |
| bool ThinClientPoolDM::canItBeDeleted(TcrConnection* conn) { |
| int load = getLoadConditioningInterval(); |
| int idle = getIdleTimeout(); |
| int min = getMinConnections(); |
| |
| if (load != -1) { |
| if (load < idle || idle == -1) { |
| idle = load; |
| } |
| } |
| |
| bool hasExpired = conn->hasExpired(load); |
| bool isIdle = conn->isIdle(idle); |
| |
| bool candidateForDeletion = hasExpired || (isIdle && m_poolSize > min); |
| bool canBeDeleted = false; |
| |
| if (conn && candidateForDeletion) { |
| TcrEndpoint* endPt = conn->getEndpointObject(); |
| bool queue = false; |
| { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> poolguard(m_queueLock); // PXR |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guardQueue( |
| endPt->getQueueHostedMutex()); |
| queue = endPt->isQueueHosted(); |
| if (queue) { |
| TcrConnection* connTemp = getFromEP(endPt); |
| if (connTemp) { |
| put(connTemp, false); |
| canBeDeleted = true; |
| } |
| } else { |
| canBeDeleted = true; |
| } |
| } |
| } |
| |
| return canBeDeleted; |
| } |
| |
| bool ThinClientPoolDM::excludeServer(std::string endpoint, |
| std::set<ServerLocation>& excludeServers) { |
| if (excludeServers.size() == 0 || |
| excludeServers.find(ServerLocation(endpoint)) == excludeServers.end()) { |
| return false; |
| } else { |
| return true; |
| } |
| } |
| |
| bool ThinClientPoolDM::excludeConnection( |
| TcrConnection* conn, std::set<ServerLocation>& excludeServers) { |
| return excludeServer(conn->getEndpointObject()->name(), excludeServers); |
| } |
| |
| TcrConnection* ThinClientPoolDM::getFromEP(TcrEndpoint* theEP) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| for (std::deque<TcrConnection*>::iterator itr = m_queue.begin(); |
| itr != m_queue.end(); itr++) { |
| if ((*itr)->getEndpointObject() == theEP) { |
| LOGDEBUG("ThinClientPoolDM::getFromEP got connection"); |
| TcrConnection* retVal = *itr; |
| m_queue.erase(itr); |
| return retVal; |
| } |
| } |
| |
| /*TcrConnection* conn = NULL; |
| GfErrType error = createPoolConnectionToAEndPoint(conn, theEP); |
| |
| if(error == GF_NOERR) |
| return conn;*/ |
| |
| return NULL; |
| } |
| |
| void ThinClientPoolDM::removeEPConnections(TcrEndpoint* theEP) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| int32_t size = static_cast<int32_t>(m_queue.size()); |
| int numConn = 0; |
| |
| while (size--) { |
| TcrConnection* curConn = m_queue.back(); |
| m_queue.pop_back(); |
| if (curConn->getEndpointObject() != theEP) { |
| m_queue.push_front(curConn); |
| } else { |
| curConn->close(); |
| GF_SAFE_DELETE(curConn); |
| numConn++; |
| } |
| } |
| |
| removeEPConnections(numConn); |
| } |
| |
| TcrConnection* ThinClientPoolDM::getNoGetLock( |
| bool& isClosed, GfErrType* error, std::set<ServerLocation>& excludeServers, |
| bool& maxConnLimit) { |
| TcrConnection* returnT = NULL; |
| { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| |
| do { |
| returnT = popFromQueue(isClosed); |
| if (returnT) { |
| if (excludeConnection(returnT, excludeServers)) { |
| returnT->close(); |
| GF_SAFE_DELETE(returnT); |
| removeEPConnections(1, false); |
| } else { |
| break; |
| } |
| } else { |
| break; |
| } |
| } while (!returnT); |
| } |
| |
| if (!returnT) { |
| *error = createPoolConnection(returnT, excludeServers, maxConnLimit); |
| } |
| |
| return returnT; |
| } |
| |
| bool ThinClientPoolDM::exclude(TcrConnection* conn, |
| std::set<ServerLocation>& excludeServers) { |
| return excludeConnection(conn, excludeServers); |
| } |
| |
| void ThinClientPoolDM::incRegionCount() { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| |
| if (!m_isDestroyed && !m_destroyPending) { |
| m_numRegions++; |
| } else { |
| throw IllegalStateException("Pool has been destroyed."); |
| } |
| } |
| |
| void ThinClientPoolDM::decRegionCount() { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_queueLock); |
| |
| m_numRegions--; |
| } |
| |
| void ThinClientPoolDM::checkRegions() { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_queueLock); |
| |
| if (m_numRegions > 0) { |
| throw IllegalStateException( |
| "Failed to destroy pool because regions are connected with it."); |
| } |
| |
| m_destroyPending = true; |
| } |
| void ThinClientPoolDM::updateNotificationStats(bool isDeltaSuccess, |
| long timeInNanoSecond) { |
| if (isDeltaSuccess) { |
| getStats().incProcessedDeltaMessages(); |
| getStats().incProcessedDeltaMessagesTime(timeInNanoSecond); |
| } else { |
| getStats().incDeltaMessageFailures(); |
| } |
| } |
| |
| GfErrType ThinClientPoolDM::doFailover(TcrConnection* conn) { |
| m_manager->setStickyConnection(conn, true); |
| TcrMessageTxFailover request; |
| TcrMessageReply reply(true, NULL); |
| |
| GfErrType err = this->sendSyncRequest(request, reply); |
| |
| if (err == GF_NOERR) { |
| switch (reply.getMessageType()) { |
| case TcrMessage::REPLY: { |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| const char* exceptionMsg = reply.getException(); |
| err = ThinClientRegion::handleServerException( |
| "CacheTransactionManager::failover", exceptionMsg); |
| break; |
| } |
| case TcrMessage::REQUEST_DATA_ERROR: { |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type in failover reply %d", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| } |
| |
| return err; |
| } |