| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include <geode/internal/geode_globals.hpp> |
| #include "ThinClientBaseDM.hpp" |
| #include "ThinClientCacheDistributionManager.hpp" |
| #include "TcrMessage.hpp" |
| #include "TcrEndpoint.hpp" |
| #include <geode/ExceptionTypes.hpp> |
| #include "ReadWriteLock.hpp" |
| #include "ThinClientRegion.hpp" |
| #include "RemoteQueryService.hpp" |
| #include "CacheImpl.hpp" |
| #include <geode/CacheAttributes.hpp> |
| #include <algorithm> |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| ThinClientCacheDistributionManager::ThinClientCacheDistributionManager( |
| TcrConnectionManager& connManager) |
| : ThinClientDistributionManager(connManager, nullptr) {} |
| |
| void ThinClientCacheDistributionManager::init() { |
| LOGDEBUG("ThinClientCacheDistributionManager::init"); |
| if (m_connManager.getNumEndPoints() == 0) { |
| throw IllegalStateException("No endpoints defined for query."); |
| } |
| ThinClientBaseDM::init(); |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendSyncRequestCq( |
| TcrMessage& request, TcrMessageReply& reply) { |
| preFailoverAction(); |
| |
| reply.setDM(this); |
| |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointsLock); |
| |
| // Return best effort result: If CQ succeeds on ANY server return no-error |
| // even if |
| // other servers might fail since this method is called in non-HA with |
| // failover case. |
| |
| GfErrType err = GF_NOTCON; |
| GfErrType opErr = GF_NOERR; |
| |
| for (std::vector<TcrEndpoint*>::iterator ep = m_endpoints.begin(); |
| ep != m_endpoints.end(); ++ep) { |
| if ((*ep)->connected()) { |
| (*ep)->setDM(this); |
| opErr = sendRequestToEP( |
| request, reply, |
| *ep); // this will go to ThinClientDistributionManager |
| if (opErr == GF_NOERR || |
| (ThinClientBaseDM::isFatalClientError(opErr) && err != GF_NOERR)) { |
| err = opErr; |
| } |
| } |
| } |
| |
| // This should return only either NOERR (takes precedence), NOTCON or a |
| // "client fatal error" such as NotAuthorized. |
| return err; |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendSyncRequest( |
| TcrMessage& request, TcrMessageReply& reply, bool attemptFailover, bool) { |
| GfErrType err = GF_NOERR; |
| int32_t type = request.getMessageType(); |
| if (m_connManager.haEnabled() && |
| (type == TcrMessage::EXECUTECQ_MSG_TYPE || |
| type == TcrMessage::STOPCQ_MSG_TYPE || |
| type == TcrMessage::CLOSECQ_MSG_TYPE || |
| type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || |
| type == TcrMessage::GETCQSTATS_MSG_TYPE || |
| type == TcrMessage::MONITORCQ_MSG_TYPE || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| type == TcrMessage::GETDURABLECQS_MSG_TYPE)) { |
| err = m_connManager.sendSyncRequestCq(request, reply); |
| } else if ((type == TcrMessage::EXECUTECQ_MSG_TYPE || |
| type == TcrMessage::STOPCQ_MSG_TYPE || |
| type == TcrMessage::CLOSECQ_MSG_TYPE || |
| type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || |
| type == TcrMessage::GETCQSTATS_MSG_TYPE || |
| type == TcrMessage::MONITORCQ_MSG_TYPE || |
| type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || |
| type == TcrMessage::GETDURABLECQS_MSG_TYPE)) { |
| err = sendSyncRequestCq(request, reply); |
| } else { |
| err = ThinClientDistributionManager::sendSyncRequest(request, reply, |
| attemptFailover); |
| } |
| return err; |
| } |
| |
| GfErrType ThinClientCacheDistributionManager::sendRequestToPrimary( |
| TcrMessage& request, TcrMessageReply& reply) { |
| GfErrType err = GF_NOTCON; |
| if (m_connManager.haEnabled()) { |
| err = m_connManager.sendRequestToPrimary(request, reply); |
| } else { |
| // Call sendSyncRequest() with failover enabled. |
| // TODO: Must ensure that active endpoint is correctly tracked so that |
| // request is sent to an endpoint |
| // for which callback connection is present. |
| err = ThinClientDistributionManager::sendSyncRequest(request, reply, true); |
| } |
| return err; |
| } |
| |
| bool ThinClientCacheDistributionManager::preFailoverAction() { |
| if (!m_initDone) { |
| // nothing to be done if not initialized |
| return true; |
| } |
| // take the global endpoint lock so that the global endpoints list |
| // does not change while we are (possibly) adding endpoint to this endpoints |
| // list and incrementing the reference count of endpoint |
| ACE_Guard<ACE_Recursive_Thread_Mutex> guard( |
| m_connManager.getGlobalEndpoints().mutex()); |
| // This method is called at the time of failover to refresh |
| // the list of endpoints. |
| std::vector<TcrEndpoint*> currentGlobalEndpointsList; |
| m_connManager.getAllEndpoints(currentGlobalEndpointsList); |
| |
| // Update local list with new endpoints. |
| std::vector<TcrEndpoint*> newEndpointsList; |
| for (std::vector<TcrEndpoint*>::iterator it = |
| currentGlobalEndpointsList.begin(); |
| it != currentGlobalEndpointsList.end(); ++it) { |
| bool found = false; |
| for (std::vector<TcrEndpoint*>::iterator currIter = m_endpoints.begin(); |
| currIter != m_endpoints.end(); ++currIter) { |
| if (*currIter == *it) { |
| found = true; |
| break; |
| } |
| } |
| if (!found) newEndpointsList.push_back(*it); |
| } |
| |
| for (std::vector<TcrEndpoint*>::iterator it = newEndpointsList.begin(); |
| it != newEndpointsList.end(); ++it) { |
| TcrEndpoint* ep = *it; |
| m_endpoints.push_back(ep); |
| ep->setNumRegions(ep->numRegions() + 1); |
| LOGFINER( |
| "TCCDM: incremented region reference count for endpoint %s " |
| "to %d", |
| ep->name().c_str(), ep->numRegions()); |
| } |
| |
| return true; |
| } |
| bool ThinClientCacheDistributionManager::postFailoverAction( |
| TcrEndpoint* endpoint) { |
| LOGDEBUG("ThinClientCacheDistributionManager : executeAllCqs"); |
| if (m_connManager.haEnabled()) { |
| LOGDEBUG( |
| "ThinClientCacheDistributionManager : executeAllCqs HA: case, done " |
| "else where"); |
| return true; |
| } |
| |
| CacheImpl* cache = m_connManager.getCacheImpl(); |
| |
| if (cache == nullptr) { |
| LOGERROR("Client not initialized for failover"); |
| return false; |
| } |
| try { |
| auto rqsService = std::dynamic_pointer_cast<RemoteQueryService>( |
| cache->getQueryService(true)); |
| rqsService->executeAllCqs(true); |
| } catch (const Exception& excp) { |
| LOGWARN("Failed to recover CQs during failover attempt to endpoint[%s]: %s", |
| endpoint->name().c_str(), excp.what()); |
| return false; |
| } catch (...) { |
| LOGWARN("Failed to recover CQs during failover attempt to endpoint[%s]", |
| endpoint->name().c_str()); |
| return false; |
| } |
| return true; |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |