| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <thread> |
| #include <chrono> |
| #include <ace/OS.h> |
| |
| #include <geode/SystemProperties.hpp> |
| #include <geode/AuthInitialize.hpp> |
| |
| #include "TcrEndpoint.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "ThinClientPoolHADM.hpp" |
| #include "StackTrace.hpp" |
| #include "CacheImpl.hpp" |
| #include "Utils.hpp" |
| #include "DistributedSystemImpl.hpp" |
| #include "util/exception.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| const char* TcrEndpoint::NC_Notification = "NC Notification"; |
| |
| TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl, |
| ACE_Semaphore& failoverSema, |
| ACE_Semaphore& cleanupSema, |
| ACE_Semaphore& redundancySema, ThinClientBaseDM* DM, |
| bool isMultiUserMode) |
| : m_needToConnectInLock(false), |
| m_connectLockCond(m_connectLock), |
| m_maxConnections(cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectionPoolSize()), |
| m_notifyConnection(nullptr), |
| m_notifyReceiver(nullptr), |
| m_numRegionListener(0), |
| m_isQueueHosted(false), |
| m_cacheImpl(cacheImpl), |
| m_uniqueId(0), |
| m_isAuthenticated(false), |
| m_msgSent(false), |
| m_pingSent(false), |
| m_numberOfTimesFailed(0), |
| m_isMultiUserMode(isMultiUserMode), |
| m_name(name), |
| m_connected(false), |
| m_isActiveEndpoint(false), |
| m_numRegions(0), |
| m_pingTimeouts(0), |
| m_notifyCount(0), |
| m_failoverSema(failoverSema), |
| m_cleanupSema(cleanupSema), |
| m_notificationCleanupSema(0), |
| m_redundancySema(redundancySema), |
| m_dupCount(0), |
| m_serverQueueStatus(NON_REDUNDANT_SERVER), |
| m_isServerQueueStatusSet(false), |
| m_queueSize(0), |
| m_baseDM(DM), |
| m_noOfConnRefs(0), |
| m_distributedMemId(0) { |
| /* |
| m_name = Utils::convertHostToCanonicalForm(m_name.c_str() ); |
| */ |
| } |
| |
| TcrEndpoint::~TcrEndpoint() { |
| m_connected = false; |
| m_isActiveEndpoint = false; |
| closeConnections(); |
| { |
| // force close the notification channel -- see bug #295 |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| if (m_numRegionListener > 0) { |
| LOGFINE( |
| "Connection to %s still has references " |
| "to subscription channel while closing", |
| m_name.c_str()); |
| // fail in dev build to track #295 better in regressions |
| GF_DEV_ASSERT(m_numRegionListener == 0); |
| |
| m_numRegionListener = 0; |
| closeNotification(); |
| } |
| } |
| while (m_notifyCount > 0) { |
| LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d", |
| m_notifyCount); |
| m_notificationCleanupSema.acquire(); |
| m_notifyCount--; |
| } |
| LOGFINE("Connection to %s deleted", m_name.c_str()); |
| } |
| |
| inline bool TcrEndpoint::needtoTakeConnectLock() { |
| #ifdef __linux |
| if (m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectWaitTimeout() > std::chrono::seconds::zero()) { |
| return m_needToConnectInLock; // once pipe or other socket error will take |
| // lock to connect. |
| } |
| return false; // once pipe or other socket error will take lock to connect. |
| #else |
| return false; |
| #endif |
| } |
| |
| GfErrType TcrEndpoint::createNewConnectionWL( |
| TcrConnection*& newConn, bool isClientNotification, bool isSecondary, |
| std::chrono::microseconds connectTimeout) { |
| LOGFINE("TcrEndpoint::createNewConnectionWL"); |
| auto connectWaitTimeout = m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectWaitTimeout(); |
| ACE_Time_Value interval(connectWaitTimeout); |
| ACE_Time_Value stopAt(ACE_OS::gettimeofday()); |
| stopAt += interval; |
| bool connCreated = false; |
| |
| while (ACE_OS::gettimeofday() < stopAt) { |
| int32_t ret = m_connectLock.acquire(&stopAt); |
| |
| LOGFINE( |
| "TcrEndpoint::createNewConnectionWL ret = %d interval = %ld error =%s", |
| ret, interval.get_msec(), ACE_OS::strerror(ACE_OS::last_error())); |
| |
| if (ret != -1) { // got lock |
| try { |
| LOGFINE("TcrEndpoint::createNewConnectionWL got lock"); |
| newConn = |
| new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected); |
| newConn->InitTcrConnection(this, m_name.c_str(), m_ports, |
| isClientNotification, isSecondary, |
| connectTimeout); |
| |
| connCreated = true; // to break while loop |
| |
| m_needToConnectInLock = false; // no need to take lock |
| |
| m_connectLock.release(); |
| LOGFINE("New Connection Created"); |
| break; |
| } catch (const TimeoutException&) { |
| LOGINFO("Timeout1 in handshake with endpoint[%s]", m_name.c_str()); |
| m_connectLock.release(); |
| // throw te; |
| return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; |
| } catch (std::exception& ex) { |
| m_connectLock.release(); |
| LOGWARN("Failed1 in handshake with endpoint[%s]: %s", m_name.c_str(), |
| ex.what()); |
| // throw ex; |
| return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; |
| } catch (...) { |
| LOGWARN("Unknown1 failure in handshake with endpoint[%s]", |
| m_name.c_str()); |
| m_connectLock.release(); |
| // throw; |
| return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; |
| } |
| } |
| } |
| |
| if (!connCreated) { |
| LOGFINE("TcrEndpoint::createNewConnectionWL timeout"); |
| // throwException(TimeoutException("Thread is hanged in connect call")); |
| return GF_CLIENT_WAIT_TIMEOUT; |
| } |
| |
| return GF_NOERR; |
| } |
| |
| GfErrType TcrEndpoint::createNewConnection( |
| TcrConnection*& newConn, bool isClientNotification, bool isSecondary, |
| std::chrono::microseconds connectTimeout, int32_t timeoutRetries, |
| bool sendUpdateNotification, bool appThreadRequest) { |
| LOGFINE( |
| "TcrEndpoint::createNewConnection: connectTimeout =%d " |
| "m_needToConnectInLock=%d appThreadRequest =%d", |
| connectTimeout.count(), m_needToConnectInLock, appThreadRequest); |
| GfErrType err = GF_NOERR; |
| newConn = nullptr; |
| while (timeoutRetries-- >= 0) { |
| try { |
| if (newConn == nullptr) { |
| if (!needtoTakeConnectLock() || !appThreadRequest) { |
| newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(), |
| m_connected); |
| bool authenticate = newConn->InitTcrConnection( |
| this, m_name.c_str(), m_ports, isClientNotification, isSecondary, |
| connectTimeout); |
| if (authenticate) { |
| authenticateEndpoint(newConn); |
| } |
| } else { |
| err = createNewConnectionWL(newConn, isClientNotification, |
| isSecondary, connectTimeout); |
| if (err == GF_CLIENT_WAIT_TIMEOUT || |
| err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) { |
| break; |
| } |
| } |
| // m_connected = true; |
| } |
| if (!isClientNotification && sendUpdateNotification) { |
| bool notificationStarted; |
| { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| notificationStarted = (m_numRegionListener > 0) || m_isQueueHosted; |
| } |
| if (notificationStarted) { |
| LOGFINE("Sending update notification message to endpoint %s", |
| m_name.c_str()); |
| TcrMessageUpdateClientNotification updateNotificationMsg( |
| new DataOutput(newConn->getConnectionManager() |
| .getCacheImpl() |
| ->createDataOutput()), |
| static_cast<int32_t>(newConn->getPort())); |
| newConn->send(updateNotificationMsg.getMsgData(), |
| updateNotificationMsg.getMsgLength()); |
| } |
| } |
| err = GF_NOERR; |
| break; |
| } catch (const TimeoutException&) { |
| LOGINFO("Timeout in handshake with endpoint[%s]", m_name.c_str()); |
| err = GF_TIMOUT; |
| m_needToConnectInLock = true; // while creating the connection |
| std::this_thread::sleep_for(std::chrono::milliseconds(50)); |
| } catch (const GeodeIOException& ex) { |
| LOGINFO("IO error[%d] in handshake with endpoint[%s]: %s", |
| ACE_OS::last_error(), m_name.c_str(), ex.what()); |
| err = GF_IOERR; |
| m_needToConnectInLock = true; // while creating the connection |
| break; |
| } catch (const AuthenticationFailedException& ex) { |
| LOGWARN("Authentication failed in handshake with endpoint[%s]: %s", |
| m_name.c_str(), ex.what()); |
| err = GF_AUTHENTICATION_FAILED_EXCEPTION; |
| break; |
| } catch (const AuthenticationRequiredException& ex) { |
| LOGWARN("Authentication required in handshake with endpoint[%s]: %s", |
| m_name.c_str(), ex.what()); |
| err = GF_AUTHENTICATION_REQUIRED_EXCEPTION; |
| break; |
| } catch (const CacheServerException& ex) { |
| LOGWARN("Exception in handshake on server[%s]: %s", m_name.c_str(), |
| ex.what()); |
| err = GF_CACHESERVER_EXCEPTION; |
| break; |
| } catch (const Exception& ex) { |
| LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(), |
| ex.what()); |
| err = GF_MSG; |
| break; |
| } catch (std::exception& ex) { |
| LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(), |
| ex.what()); |
| err = GF_MSG; |
| break; |
| } catch (...) { |
| LOGWARN("Unknown failure in handshake with endpoint[%s]", m_name.c_str()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| if (err != GF_NOERR && newConn != nullptr) { |
| _GEODE_SAFE_DELETE(newConn); |
| } |
| return err; |
| } |
| |
| void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) { |
| LOGDEBUG( |
| "TcrEndpoint::authenticateEndpoint m_isAuthenticated = %d " |
| "this->m_baseDM = %d", |
| m_isAuthenticated, m_baseDM); |
| if (!m_isAuthenticated && m_baseDM) { |
| this->setConnected(); |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock); |
| GfErrType err = GF_NOERR; |
| auto creds = getCredentials(); |
| |
| if (creds != nullptr) { |
| LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d", |
| creds->getSize()); |
| } else { |
| LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app "); |
| } |
| |
| TcrMessageUserCredential request( |
| new DataOutput(m_cacheImpl->createDataOutput()), creds, m_baseDM); |
| |
| LOGDEBUG("request is created"); |
| TcrMessageReply reply(true, this->m_baseDM); |
| // err = this->sendRequestToEP(request, reply, ( *it ).int_id_); |
| err = this->sendRequestConnWithRetry(request, reply, conn); |
| LOGDEBUG("authenticateEndpoint error = %d", err); |
| if (err == GF_NOERR) { |
| // put the object into local region |
| switch (reply.getMessageType()) { |
| case TcrMessage::RESPONSE: { |
| // nothing to be done; |
| break; |
| } |
| case TcrMessage::EXCEPTION: { |
| err = ThinClientRegion::handleServerException("AuthException", |
| reply.getException()); |
| break; |
| } |
| default: { |
| LOGERROR("Unknown message type %d while sending credentials", |
| reply.getMessageType()); |
| err = GF_MSG; |
| break; |
| } |
| } |
| } |
| // throw exception if it is not authenticated |
| GfErrTypeToException("TcrEndpoint::authenticateEndpoint", err); |
| |
| m_isAuthenticated = true; |
| } |
| } |
| std::shared_ptr<Properties> TcrEndpoint::getCredentials() { |
| const auto& distributedSystem = m_cacheImpl->getDistributedSystem(); |
| const auto& tmpSecurityProperties = |
| distributedSystem.getSystemProperties().getSecurityProperties(); |
| |
| if (const auto& authInitialize = m_cacheImpl->getAuthInitialize()) { |
| LOGFINER( |
| "Acquired handle to AuthInitialize plugin, " |
| "getting credentials for %s", |
| m_name.c_str()); |
| const auto& tmpAuthIniSecurityProperties = |
| authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str()); |
| LOGFINER("Done getting credentials"); |
| return tmpAuthIniSecurityProperties; |
| } |
| return nullptr; |
| } |
| |
| ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus( |
| int32_t& queueSize, bool addToQueue, TcrConnection*& statusConn) { |
| GfErrType err = GF_NOERR; |
| TcrConnection* newConn; |
| ServerQueueStatus status = NON_REDUNDANT_SERVER; |
| |
| err = createNewConnection(newConn, false, false, |
| m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectTimeout()); |
| if (err == GF_NOERR) { |
| status = newConn->getServerQueueStatus(queueSize); |
| |
| if (status == REDUNDANT_SERVER || status == PRIMARY_SERVER) { |
| if (addToQueue) { |
| m_opConnections.put(newConn, true); |
| } else { |
| statusConn = newConn; |
| } |
| m_connected = true; |
| return status; |
| } else { |
| // remove port from ports list (which is sent to server in notification |
| // handshake). |
| closeConnection(newConn); |
| return status; |
| } |
| } |
| |
| return status; |
| } |
| |
| GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary, |
| bool isActiveEndpoint, |
| ThinClientBaseDM* distMgr) { |
| // Pre-conditions: |
| // 1. If this is a secondary server then clientNotification must be true |
| GF_DEV_ASSERT(!isSecondary || clientNotification); |
| |
| bool connected = false; |
| GfErrType err = GF_NOERR; |
| |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); |
| // Three cases here: |
| // 1. m_connected is false, m_isActiveEndpoint is false and then |
| // if isActiveEndpoint is true, then create 'max' connections |
| // 2. m_connected is false, m_isActiveEndpoint is false and then |
| // if isActiveEndpoint is false, then create just one connection |
| // to ping the server |
| // 3. m_connected is true, m_isActiveEndpoint is false (i.e. server was |
| // previously not an active endpoint) then if isSecondary is false then |
| // create 'max-1' connections else do nothing |
| m_opConnections.reset(); |
| if (m_maxConnections <= 0) { |
| connected = true; |
| } else if (!m_isActiveEndpoint) { |
| int maxConnections = 0; |
| if (isActiveEndpoint) { |
| if (m_connected) { |
| maxConnections = m_maxConnections - 1; |
| } else { |
| maxConnections = m_maxConnections; |
| } |
| } else if (!m_connected) { |
| maxConnections = 1; |
| } |
| if (maxConnections > 0) { |
| LOGINFO("Starting Handshake with %s%s", |
| (isSecondary ? "secondary server " |
| : (isActiveEndpoint ? "" : "primary server ")), |
| m_name.c_str()); |
| for (int connNum = 0; connNum < maxConnections; ++connNum) { |
| TcrConnection* newConn; |
| if ((err = createNewConnection(newConn, false, false, |
| m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectTimeout(), |
| 0, m_connected)) != GF_NOERR) { |
| m_connected = false; |
| m_isActiveEndpoint = false; |
| closeConnections(); |
| return err; |
| } |
| m_opConnections.put(newConn, true); |
| } |
| LOGINFO("Handshake with %s%s success", |
| (isSecondary ? "secondary server " |
| : (isActiveEndpoint ? "" : "primary server ")), |
| m_name.c_str()); |
| m_connected = true; |
| m_isActiveEndpoint = isActiveEndpoint; |
| } |
| } |
| |
| if (m_connected || connected) { |
| if (clientNotification) { |
| if (distMgr != nullptr) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock); |
| m_distMgrs.push_back(distMgr); |
| } |
| LOGFINEST( |
| "Registering subscription " |
| "channel for endpoint %s", |
| m_name.c_str()); |
| // setup notification channel for the first region |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| if (m_numRegionListener == 0) { |
| if ((err = createNewConnection(m_notifyConnection, true, isSecondary, |
| m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectTimeout() * |
| 3, |
| 0)) != GF_NOERR) { |
| m_connected = false; |
| m_isActiveEndpoint = false; |
| closeConnections(); |
| LOGWARN("Failed to start subscription channel for endpoint %s", |
| m_name.c_str()); |
| return err; |
| } |
| m_notifyReceiver = new Task<TcrEndpoint>( |
| this, &TcrEndpoint::receiveNotification, NC_Notification); |
| m_notifyReceiver->start(); |
| } |
| ++m_numRegionListener; |
| LOGFINEST("Incremented notification region count for endpoint %s to %d", |
| m_name.c_str(), m_numRegionListener); |
| m_connected = true; |
| } |
| } |
| |
| // Post-conditions: |
| // 1. The endpoint should be marked as active, only if m_connected is true |
| // 2. If this is not an active endpoint and it is connected then only one |
| // connection + notify channel |
| GF_DEV_ASSERT(!m_isActiveEndpoint || m_connected); |
| #if GF_DEVEL_ASSERTS == 1 |
| int numConnections = m_opConnections.size(); |
| if (!m_isActiveEndpoint && !isActiveEndpoint && m_connected && |
| (numConnections != 1 || m_numRegionListener <= 0 || |
| m_notifyReceiver == nullptr)) { |
| LOGWARN( |
| "Inactive connected endpoint does not have exactly one " |
| "connection. Number of connections: %d, number of region listeners: " |
| "%d", |
| numConnections, m_numRegionListener); |
| } |
| #endif |
| |
| return err; |
| } |
| |
| void TcrEndpoint::unregisterDM(bool clientNotification, |
| ThinClientBaseDM* distMgr, bool) { |
| if (clientNotification) { |
| LOGFINEST( |
| "Closing subscription " |
| "channel for endpoint %s", |
| m_name.c_str()); |
| // close notification channel if there is no region |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| if (m_numRegionListener > 0 && --m_numRegionListener == 0) { |
| closeNotification(); |
| } |
| LOGFINEST("Decremented subscription region count for endpoint %s to %d", |
| m_name.c_str(), m_numRegionListener); |
| if (distMgr != nullptr) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock); |
| m_distMgrs.remove(distMgr); |
| } |
| LOGFINEST("Done unsubscribe for endpoint %s", m_name.c_str()); |
| } |
| } |
| |
| void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) { |
| LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str()); |
| if (!m_connected || m_noOfConnRefs == 0) { |
| LOGFINER("Skipping ping task for disconnected endpoint %s", m_name.c_str()); |
| return; |
| } |
| |
| if (!m_msgSent && !m_pingSent) { |
| TcrMessagePing* pingMsg = TcrMessage::getPingMessage(m_cacheImpl); |
| TcrMessageReply reply(true, nullptr); |
| LOGFINEST("Sending ping message to endpoint %s", m_name.c_str()); |
| GfErrType error; |
| if (poolDM != nullptr) { |
| error = poolDM->sendRequestToEP(*pingMsg, reply, this); |
| } else { |
| error = send(*pingMsg, reply); |
| } |
| LOGFINEST("Sent ping message to endpoint %s with error code %d%s", |
| m_name.c_str(), error, error == GF_NOERR ? " (no error)" : ""); |
| if (error == GF_NOERR) { |
| m_pingSent = true; |
| } |
| if (error == GF_TIMOUT && m_pingTimeouts < 2) { |
| ++m_pingTimeouts; |
| } else { |
| m_pingTimeouts = 0; |
| // Only call setConnectionStatus if the status has changed (non thread |
| // safe check) |
| // This is to avoid blocking the ping thread if notification channel takes |
| // a long time to |
| // complete causing the server to drop the client in the midst of |
| // connection establishment. |
| bool connected = (error == GF_NOERR) |
| ? (reply.getMessageType() == TcrMessage::REPLY) |
| : false; |
| if (m_connected != connected) { |
| setConnectionStatus(connected); |
| } |
| } |
| LOGFINEST("Completed sending ping message to endpoint %s", m_name.c_str()); |
| } else { |
| m_msgSent = false; |
| m_pingSent = false; |
| } |
| } |
| |
| bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) { |
| return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid); |
| } |
| |
| int TcrEndpoint::receiveNotification(volatile bool& isRunning) { |
| LOGFINE("Started subscription channel for endpoint %s", m_name.c_str()); |
| while (isRunning) { |
| TcrMessageReply* msg = nullptr; |
| try { |
| size_t dataLen; |
| ConnErrType opErr = CONN_NOERR; |
| auto data = m_notifyConnection->receive(&dataLen, &opErr, |
| std::chrono::seconds(5)); |
| |
| if (opErr == CONN_IOERR) { |
| // Endpoint is disconnected, this exception is expected |
| LOGFINER( |
| "IO exception while receiving subscription event for endpoint %d", |
| opErr); |
| if (isRunning) { |
| setConnectionStatus(false); |
| // close notification channel |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| if (m_numRegionListener > 0) { |
| m_numRegionListener = 0; |
| closeNotification(); |
| } |
| } |
| break; |
| } |
| |
| if (data) { |
| msg = new TcrMessageReply(true, m_baseDM); |
| msg->initCqMap(); |
| msg->setData(data, static_cast<int32_t>(dataLen), |
| this->getDistributedMemberID(), |
| *(m_cacheImpl->getSerializationRegistry()), |
| *(m_cacheImpl->getMemberListForVersionStamp())); |
| handleNotificationStats(static_cast<int64_t>(dataLen)); |
| LOGDEBUG("receive notification %d", msg->getMessageType()); |
| |
| if (!isRunning) { |
| _GEODE_SAFE_DELETE(msg); |
| break; |
| } |
| |
| if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) { |
| LOGFINE("Received ping from server subscription channel."); |
| } |
| |
| // ignore some message types like REGISTER_INSTANTIATORS |
| if (msg->shouldIgnore()) { |
| _GEODE_SAFE_DELETE(msg); |
| continue; |
| } |
| |
| bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER); |
| if (!msg->hasCqPart()) { |
| if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) { |
| const std::string& regionFullPath1 = msg->getRegionName(); |
| auto region1 = m_cacheImpl->getRegion(regionFullPath1); |
| |
| if (region1 != nullptr && |
| !static_cast<ThinClientRegion*>(region1.get()) |
| ->getDistMgr() |
| ->isEndpointAttached(this)) { |
| // drop event before even processing the eventid for duplicate |
| // checking |
| LOGFINER("Endpoint %s dropping event for region %s", |
| m_name.c_str(), regionFullPath1.c_str()); |
| _GEODE_SAFE_DELETE(msg); |
| continue; |
| } |
| } |
| } |
| |
| if (!checkDupAndAdd(msg->getEventId())) { |
| m_dupCount++; |
| if (m_dupCount % 100 == 1) { |
| LOGFINE("Dropped %dst duplicate notification message", m_dupCount); |
| } |
| _GEODE_SAFE_DELETE(msg); |
| continue; |
| } |
| |
| if (isMarker) { |
| LOGFINE("Got a marker message on endpont %s", m_name.c_str()); |
| m_cacheImpl->processMarker(); |
| processMarker(); |
| _GEODE_SAFE_DELETE(msg); |
| } else { |
| if (!msg->hasCqPart()) // || msg->isInterestListPassed()) |
| { |
| const std::string& regionFullPath = msg->getRegionName(); |
| auto region = m_cacheImpl->getRegion(regionFullPath); |
| |
| if (region != nullptr) { |
| static_cast<ThinClientRegion*>(region.get()) |
| ->receiveNotification(msg); |
| } else { |
| LOGWARN( |
| "Notification for region %s that does not exist in " |
| "client cacheImpl.", |
| regionFullPath.c_str()); |
| } |
| } else { |
| LOGDEBUG("receive cq notification %d", msg->getMessageType()); |
| auto queryService = getQueryService(); |
| if (queryService != nullptr) { |
| static_cast<RemoteQueryService*>(queryService.get()) |
| ->receiveNotification(msg); |
| } |
| } |
| } |
| } |
| } catch (const TimeoutException&) { |
| // If there is no notification, this exception is expected |
| // But this is valid only when *no* data has been received |
| // otherwise if data has been read then TcrConnection will throw |
| // a GeodeIOException which will cause the channel to close. |
| LOGDEBUG( |
| "receiveNotification timed out: no data received from " |
| "endpoint %s", |
| m_name.c_str()); |
| } catch (const GeodeIOException& e) { |
| // Endpoint is disconnected, this exception is expected |
| LOGFINER( |
| "IO exception while receiving subscription event for endpoint %s: %s", |
| m_name.c_str(), e.what()); |
| if (m_connected) { |
| setConnectionStatus(false); |
| // close notification channel |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| if (m_numRegionListener > 0) { |
| m_numRegionListener = 0; |
| closeNotification(); |
| } |
| } |
| break; |
| } catch (const Exception& ex) { |
| _GEODE_SAFE_DELETE(msg); |
| LOGERROR( |
| "Exception while receiving subscription event for endpoint %s:: %s: " |
| "%s", |
| m_name.c_str(), ex.getName().c_str(), ex.what()); |
| } catch (...) { |
| _GEODE_SAFE_DELETE(msg); |
| LOGERROR( |
| "Unexpected exception while " |
| "receiving subscription event from endpoint %s", |
| m_name.c_str()); |
| } |
| } |
| LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str()); |
| return 0; |
| } |
| |
| inline bool TcrEndpoint::compareTransactionIds(int32_t reqTransId, |
| int32_t replyTransId, |
| std::string& failReason, |
| TcrConnection* conn) { |
| LOGDEBUG("TcrEndpoint::compareTransactionIds requested id = %d ,replied = %d", |
| reqTransId, replyTransId); |
| if (replyTransId != reqTransId) { |
| LOGERROR( |
| "Transaction ids do not match on endpoint %s for " |
| "send operation: %d, %d. Possible serialization mismatch", |
| m_name.c_str(), reqTransId, replyTransId); |
| closeConnection(conn); |
| failReason = "mismatch of transaction IDs in operation"; |
| return false; |
| } |
| return true; |
| } |
| |
| inline bool TcrEndpoint::handleIOException(const std::string& message, |
| TcrConnection*& conn, bool) { |
| int32_t lastError = ACE_OS::last_error(); |
| if (lastError == ECONNRESET || lastError == EPIPE) { |
| _GEODE_SAFE_DELETE(conn); |
| } else { |
| closeConnection(conn); |
| } |
| LOGFINE( |
| "IO error during send for endpoint %s " |
| "[errno: %d: %s]: %s", |
| m_name.c_str(), lastError, ACE_OS::strerror(lastError), message.c_str()); |
| // EAGAIN =11, EWOULDBLOCK = 10035L, EPIPE = 32, ECONNRESET =10054L(An |
| // existing connection was forcibly closed by the remote host.) |
| if (!(lastError == EAGAIN || lastError == EWOULDBLOCK /*|| |
| lastError == ECONNRESET */ |
| /*|| lastError == EPIPE*/)) { |
| // break from enclosing loop without retries |
| // something wrong try connect in lock |
| m_needToConnectInLock = true; |
| return false; |
| } |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| return true; |
| } |
| |
| GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request, |
| TcrMessageReply& reply, |
| TcrConnection* conn, |
| std::string& failReason) { |
| int32_t type = request.getMessageType(); |
| GfErrType error = GF_NOERR; |
| |
| LOGFINER("Sending request type %d to endpoint [%s] via connection [%p]", type, |
| m_name.c_str(), conn); |
| // TcrMessage * req = const_cast<TcrMessage *>(&request); |
| LOGDEBUG("TcrEndpoint::sendRequestConn = %d", m_baseDM); |
| if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn); |
| if (((type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION) && |
| (request.hasResult() & 2))) { |
| sendRequestForChunkedResponse(request, reply, conn); |
| } else if (type == TcrMessage::REGISTER_INTEREST_LIST || |
| type == TcrMessage::REGISTER_INTEREST || |
| type == TcrMessage::QUERY || |
| type == TcrMessage::QUERY_WITH_PARAMETERS || |
| type == TcrMessage::GET_ALL_70 || |
| type == TcrMessage::GET_ALL_WITH_CALLBACK || |
| type == TcrMessage::PUTALL || |
| type == TcrMessage::PUT_ALL_WITH_CALLBACK || |
| type == TcrMessage::REMOVE_ALL || |
| ((type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION) && |
| (request.hasResult() & 2)) || |
| type == |
| TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || // This is |
| // kept |
| // aside as |
| // server |
| // always |
| // sends |
| // chunked |
| // response. |
| type == TcrMessage::EXECUTECQ_MSG_TYPE || |
| type == TcrMessage::STOPCQ_MSG_TYPE || |
| type == TcrMessage::CLOSECQ_MSG_TYPE || |
| type == TcrMessage::KEY_SET || |
| type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || |
| type == TcrMessage::GETCQSTATS_MSG_TYPE || |
| type == TcrMessage::MONITORCQ_MSG_TYPE || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| type == TcrMessage::GETDURABLECQS_MSG_TYPE) { |
| sendRequestForChunkedResponse(request, reply, conn); |
| LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE"); |
| } else { |
| // Chk request type to request if so request.getCallBackArg flag & setCall |
| // back arg flag to true, and in response chk for this flag. |
| if (request.getMessageType() == TcrMessage::REQUEST) { |
| if (request.isCallBackArguement()) { |
| reply.setCallBackArguement(true); |
| } |
| } |
| size_t dataLen; |
| LOGDEBUG("sendRequestConn: calling sendRequest"); |
| auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(), |
| &dataLen, request.getTimeout(), |
| reply.getTimeout(), request.getMessageType()); |
| reply.setMessageTypeRequest(type); |
| reply.setData( |
| data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(), |
| *(m_cacheImpl->getSerializationRegistry()), |
| *(m_cacheImpl |
| ->getMemberListForVersionStamp())); // memory is released by |
| // TcrMessage setData(). |
| } |
| |
| // reset idle timeout of the connection for pool connection manager |
| if (type != TcrMessage::PING) { |
| conn->touch(); |
| } |
| |
| if (reply.getMessageType() == TcrMessage::INVALID) { |
| if (type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) { |
| ChunkedFunctionExecutionResponse* resultCollector = |
| dynamic_cast<ChunkedFunctionExecutionResponse*>( |
| reply.getChunkedResultHandler()); |
| if (resultCollector->getResult() == false) { |
| LOGDEBUG("TcrEndpoint::send: function execution, no response desired"); |
| // m_opConnections.put( conn, false ); |
| // return GF_NOERR; |
| error = GF_NOERR; |
| } |
| } else { |
| // Treat INVALID messages like IO exceptions |
| error = GF_IOERR; |
| } |
| } |
| // do we need to consider case where compareTransactionIds return true? |
| // I think we will not have issue here |
| else if (!compareTransactionIds(request.getTransId(), reply.getTransId(), |
| failReason, conn)) { |
| error = GF_NOTCON; |
| } |
| if (error == GF_NOERR) { |
| if (m_baseDM != nullptr) |
| m_baseDM->afterSendingRequest(request, reply, conn); |
| } |
| |
| return error; |
| } |
| |
| bool TcrEndpoint::isMultiUserMode() { |
| LOGDEBUG("TcrEndpoint::isMultiUserMode %d", m_isMultiUserMode); |
| return m_isMultiUserMode; |
| } |
| |
| GfErrType TcrEndpoint::sendRequestWithRetry( |
| const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn, |
| bool& epFailure, std::string& failReason, int maxSendRetries, |
| bool useEPPool, std::chrono::microseconds requestedTimeout, |
| bool isBgThread) { |
| GfErrType error = GF_NOTCON; |
| bool createNewConn = false; |
| // int32_t type = request.getMessageType(); |
| int sendRetryCount = 0; |
| |
| // Retry on the following send errors: |
| // Timeout: 1 retry |
| // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry |
| // Connection pool is empty (too many threads or no connections available): 1 |
| // retry |
| |
| do { |
| if (sendRetryCount > 0) { |
| // this is a retry. set the retry bit in the early Ack |
| (const_cast<TcrMessage&>(request)).updateHeaderForRetry(); |
| } |
| |
| auto timeout = requestedTimeout; |
| epFailure = false; |
| if (useEPPool) { |
| if (m_maxConnections == 0) { |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); |
| if (m_maxConnections == 0) { |
| LOGFINE( |
| "Creating a new connection when connection-pool-size system " |
| "property set to 0"); |
| if ((error = createNewConnection(conn, false, false, |
| m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectTimeout())) != |
| GF_NOERR) { |
| epFailure = true; |
| continue; |
| } |
| m_maxConnections = 1; |
| } |
| } |
| } |
| LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s", |
| m_name.c_str()); |
| if (createNewConn) { |
| createNewConn = false; |
| if (!m_connected) { |
| return GF_NOTCON; |
| } else if ((error = |
| createNewConnection(conn, false, false, |
| m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectTimeout(), |
| 0, true)) != GF_NOERR) { |
| epFailure = true; |
| continue; |
| } |
| } else if (conn == nullptr && useEPPool) { |
| LOGFINER( |
| "sendRequestWithRetry:: looking for connection in queue timeout = " |
| "%d ", |
| timeout.count()); |
| // max wait time to get a connection |
| conn = m_opConnections.getUntil(timeout); |
| } |
| if (!m_connected) { |
| return GF_NOTCON; |
| } |
| if (conn != nullptr) { |
| LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s", |
| m_name.c_str()); |
| int reqTransId = request.getTransId(); |
| |
| try { |
| LOGDEBUG("Calling sendRequestConn"); |
| error = sendRequestConn(request, reply, conn, failReason); |
| if (error == GF_IOERR) { |
| epFailure = true; |
| failReason = "received INVALID reply from server"; |
| if (!handleIOException(failReason, conn, isBgThread)) { |
| break; |
| } |
| createNewConn = true; |
| } else if (error == GF_NOTCON) { |
| epFailure = true; |
| createNewConn = true; |
| } else { |
| if (useEPPool) { |
| m_opConnections.put(conn, false); |
| } |
| return GF_NOERR; |
| } |
| } catch (const TimeoutException&) { |
| error = GF_TIMOUT; |
| LOGFINE( |
| "Send timed out for endpoint %s. " |
| "Message txid = %d", |
| m_name.c_str(), reqTransId); |
| closeFailedConnection(conn); |
| /* |
| if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ //close |
| connection only when not a sticky connection. |
| closeConnection( conn ); |
| }*/ |
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
| int32_t type = request.getMessageType(); |
| epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL && |
| type != TcrMessage::PUT_ALL_WITH_CALLBACK && |
| type != TcrMessage::EXECUTE_FUNCTION && |
| type != TcrMessage::EXECUTE_REGION_FUNCTION && |
| type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP && |
| type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE); |
| |
| // epFailure = true; |
| failReason = "timed out waiting for endpoint"; |
| createNewConn = true; |
| } catch (const GeodeIOException& ex) { |
| error = GF_IOERR; |
| epFailure = true; |
| failReason = "IO error for endpoint"; |
| if (!handleIOException(ex.what(), conn, |
| isBgThread)) { // change here |
| break; |
| } |
| createNewConn = true; |
| } catch (const Exception& ex) { |
| failReason = ex.getName(); |
| failReason.append(": "); |
| failReason.append(ex.what()); |
| LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(), |
| failReason.c_str()); |
| if (compareTransactionIds(reqTransId, reply.getTransId(), failReason, |
| conn)) { |
| if (Log::warningEnabled()) { |
| LOGWARN("Stack trace: %s", ex.getStackTrace().c_str()); |
| } |
| error = GF_MSG; |
| if (useEPPool) { |
| m_opConnections.put(conn, false); |
| } else { |
| // we are here its better to close the connection as |
| // "compareTransactionIds" |
| // will not close the connection |
| closeConnection(conn); |
| } |
| break; |
| } else { |
| error = GF_NOTCON; |
| epFailure = true; |
| createNewConn = true; |
| } |
| } catch (...) { |
| failReason = "unexpected exception"; |
| LOGERROR( |
| "Unexpected exception while sending request to " |
| "endpoint %s", |
| m_name.c_str()); |
| if (compareTransactionIds(reqTransId, reply.getTransId(), failReason, |
| conn)) { |
| error = GF_MSG; |
| if (useEPPool) { |
| m_opConnections.put(conn, false); |
| } else { |
| // we are here its better to close the connection as |
| // "compareTransactionIds" |
| // will not close the connection |
| closeConnection(conn); |
| } |
| break; |
| } else { |
| error = GF_NOTCON; |
| epFailure = true; |
| createNewConn = true; |
| } |
| } |
| } else { |
| if (useEPPool) { |
| epFailure = true; |
| failReason = "server connection could not be obtained"; |
| if (timeout <= std::chrono::microseconds::zero()) { |
| error = GF_TIMOUT; |
| LOGWARN( |
| "No connection available for %ld seconds " |
| "for endpoint %s.", |
| requestedTimeout.count(), m_name.c_str()); |
| } else { |
| error = GF_NOTCON; |
| LOGFINE( |
| "Returning without connection with %d seconds remaining " |
| "for endpoint %s.", |
| timeout.count(), m_name.c_str()); |
| } |
| } else { |
| LOGERROR("Unexpected failure while sending request to server."); |
| GF_DEV_ASSERT("Bug in TcrEndpoint::sendRequestWithRetry()?" ? false |
| : true); |
| } |
| } |
| } while (++sendRetryCount <= maxSendRetries); |
| return error; |
| } // namespace client |
| |
| void TcrEndpoint::setRetry(const TcrMessage& request, int& maxSendRetries) { |
| int32_t type = request.getMessageType(); |
| if (type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS || |
| type == TcrMessage::PUTALL || type == TcrMessage::PUT_ALL_WITH_CALLBACK || |
| type == TcrMessage::EXECUTE_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION || |
| type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) { |
| maxSendRetries = 0; |
| } |
| } |
| |
| GfErrType TcrEndpoint::send(const TcrMessage& request, TcrMessageReply& reply) { |
| GfErrType error = GF_NOTCON; |
| |
| int maxSendRetries = 1; |
| setRetry(request, maxSendRetries); |
| |
| TcrConnection* conn = nullptr; |
| bool epFailure; |
| std::string failReason; |
| // TODO: remove sendRetryCount as parameter. |
| error = sendRequestWithRetry(request, reply, conn, epFailure, failReason, |
| maxSendRetries, true, reply.getTimeout()); |
| |
| if (error == GF_NOERR) { |
| m_msgSent = true; |
| } |
| |
| if (error != GF_NOERR && epFailure) { |
| LOGFINE("Send Giving up for endpoint %s; reason: %s.", m_name.c_str(), |
| failReason.c_str()); |
| setConnectionStatus(false); |
| } |
| |
| // Postconditions: |
| #if GF_DEVEL_ASSERTS == 1 |
| int opConnectionsSize = m_opConnections.size(); |
| if (!m_isActiveEndpoint && (opConnectionsSize > 1)) { |
| LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, 1); |
| } else if (opConnectionsSize > m_maxConnections) { |
| LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, |
| m_maxConnections); |
| } |
| #endif |
| |
| return error; |
| } |
| |
| GfErrType TcrEndpoint::sendRequestConnWithRetry(const TcrMessage& request, |
| TcrMessageReply& reply, |
| TcrConnection*& conn, |
| bool isBgThread) { |
| GfErrType error = GF_NOTCON; |
| |
| int maxSendRetries = 1; |
| setRetry(request, maxSendRetries); |
| |
| // Retry on the following send errors: |
| // Timeout: 1 retry |
| // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry |
| // Connection pool is empty (too many threads or no connections available): 1 |
| // retry |
| bool epFailure; |
| std::string failReason; |
| LOGFINE("sendRequestConnWithRetry:: maxSendRetries = %d ", maxSendRetries); |
| error = sendRequestWithRetry(request, reply, conn, epFailure, failReason, |
| maxSendRetries, false, reply.getTimeout(), |
| isBgThread); |
| if (error == GF_NOERR) { |
| m_msgSent = true; |
| } |
| |
| if (error != GF_NOERR && epFailure) { |
| LOGFINE("sendRequestConnWithRetry: Giving up for endpoint %s; reason: %s.", |
| m_name.c_str(), failReason.c_str()); |
| setConnectionStatus(false); |
| } |
| |
| return error; |
| } |
| |
| void TcrEndpoint::setConnectionStatus(bool status) { |
| // : Store the original value of m_isActiveEndpoint. |
| // This is to try make failover more resilient for the case when |
| // a foreground operation thread is connecting to an endpoint while |
| // the notification thread is disconnecting from the same, or vice versa. |
| // By comparing the original value with the new value we know if |
| // someone else has changed the status in that duration, and skip |
| // the change if that is the case. |
| // Same logic applies for the ping thread. |
| // Try something like (after the 2.5 patch release): |
| // bool wasActive = m_isActiveEndpoint; |
| // Then after taking the lock: |
| // If ( !wasActive && isActiveEndpoint ) { return; } |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); |
| if (m_connected != status) { |
| bool connected = m_connected; |
| m_connected = status; |
| if (connected) { |
| m_numberOfTimesFailed += 1; |
| m_isAuthenticated = false; |
| // disconnected |
| LOGFINE("Disconnecting from endpoint %s", m_name.c_str()); |
| closeConnections(); |
| m_isActiveEndpoint = false; |
| LOGFINE("Disconnected from endpoint %s", m_name.c_str()); |
| triggerRedundancyThread(); |
| } |
| } |
| } |
| |
| void TcrEndpoint::triggerRedundancyThread() { |
| m_failoverSema.release(); |
| m_redundancySema.release(); |
| } |
| |
| void TcrEndpoint::closeConnection(TcrConnection*& conn) { |
| conn->close(); |
| m_ports.erase(conn->getPort()); |
| _GEODE_SAFE_DELETE(conn); |
| } |
| |
| void TcrEndpoint::closeConnections() { |
| m_opConnections.close(); |
| m_ports.clear(); |
| m_maxConnections = m_cacheImpl->getDistributedSystem() |
| .getSystemProperties() |
| .connectionPoolSize(); |
| } |
| |
| /* |
| void TcrEndpoint::sendNotificationCloseMsg() |
| { |
| if (m_notifyConnection != nullptr) { |
| m_notifyReceiver->stop(); |
| m_notifyConnection->close(); |
| } |
| } |
| */ |
| |
| void TcrEndpoint::closeNotification() { |
| LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str()); |
| m_notifyConnection->close(); |
| m_notifyReceiver->stopNoblock(); |
| TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager(); |
| tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection, |
| m_notificationCleanupSema); |
| m_notifyCount++; |
| m_cleanupSema.release(); |
| m_isQueueHosted = false; |
| LOGFINEST( |
| "Added susbcription channel for deletion and " |
| "released cleanup semaphore for endpoint %s", |
| m_name.c_str()); |
| } |
| |
| void TcrEndpoint::stopNoBlock() { |
| if (m_notifyReceiver != nullptr) { |
| m_notifyConnection->close(); |
| m_notifyReceiver->stopNoblock(); |
| } |
| } |
| |
| void TcrEndpoint::stopNotifyReceiverAndCleanup() { |
| LOGFINER("Stopping subscription receiver and cleaning up"); |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); |
| |
| if (m_notifyReceiver != nullptr) { |
| LOGFINER("Waiting for notification thread..."); |
| // m_notifyReceiver->stopNoblock(); |
| m_notifyReceiver->wait(); |
| bool found = false; |
| for (std::list<Task<TcrEndpoint>*>::iterator it = |
| m_notifyReceiverList.begin(); |
| it != m_notifyReceiverList.end(); it++) { |
| if (*it == m_notifyReceiver) { |
| found = true; |
| break; |
| } |
| } |
| |
| if (!found) { |
| _GEODE_SAFE_DELETE(m_notifyReceiver); |
| _GEODE_SAFE_DELETE(m_notifyConnection); |
| } |
| } |
| |
| m_numRegionListener = 0; |
| |
| if (m_notifyReceiverList.size() > 0) { |
| LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d", |
| m_notifyReceiverList.size()); |
| for (std::list<Task<TcrEndpoint>*>::iterator it = |
| m_notifyReceiverList.begin(); |
| it != m_notifyReceiverList.end(); it++) { |
| LOGFINER( |
| "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify " |
| "recievers."); |
| _GEODE_SAFE_DELETE(*it); |
| } |
| } |
| |
| if (m_notifyConnectionList.size() > 0) { |
| LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d", |
| m_notifyConnectionList.size()); |
| for (std::list<TcrConnection*>::iterator it = |
| m_notifyConnectionList.begin(); |
| it != m_notifyConnectionList.end(); it++) { |
| LOGFINER( |
| "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify " |
| "connections."); |
| _GEODE_SAFE_DELETE(*it); |
| } |
| } |
| } |
| |
| void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus, |
| int32_t queueSize) { |
| if (!m_isServerQueueStatusSet) { |
| m_isServerQueueStatusSet = true; |
| m_serverQueueStatus = queueStatus; |
| m_queueSize = queueSize; |
| } |
| } |
| |
| bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; } |
| |
| void TcrEndpoint::processMarker() { |
| m_cacheImpl->tcrConnectionManager().processMarker(); |
| } |
| |
| std::shared_ptr<QueryService> TcrEndpoint::getQueryService() { |
| return m_cacheImpl->getQueryService(true); |
| } |
| |
| void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request, |
| TcrMessageReply& reply, |
| TcrConnection* conn) { |
| conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply); |
| } |
| |
| void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) { |
| closeConnection(conn); |
| } |
| |
| void TcrEndpoint::handleNotificationStats(int64_t) {} |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |