| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "RemoteQueryService.hpp" |
| |
| #include "CacheImpl.hpp" |
| #include "CqServiceVsdStats.hpp" |
| #include "ReadWriteLock.hpp" |
| #include "RemoteQuery.hpp" |
| #include "ThinClientPoolDM.hpp" |
| #include "UserAttributes.hpp" |
| #include "statistics/StatisticsManager.hpp" |
| |
| namespace apache { |
| namespace geode { |
| namespace client { |
| |
| RemoteQueryService::RemoteQueryService(CacheImpl* cache, |
| ThinClientPoolDM* poolDM) |
| : m_invalid(true), |
| m_cqService(nullptr), |
| m_statisticsFactory( |
| cache->getStatisticsManager().getStatisticsFactory()) { |
| if (poolDM) { |
| m_tccdm = poolDM; |
| } else { |
| m_tccdm = |
| new ThinClientCacheDistributionManager(cache->tcrConnectionManager()); |
| } |
| LOGFINEST("Initialized m_tccdm"); |
| } |
| |
| void RemoteQueryService::init() { |
| TryWriteGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| LOGFINEST("RemoteQueryService::init: initializing TCCDM"); |
| if (dynamic_cast<ThinClientCacheDistributionManager*>(m_tccdm)) { |
| m_tccdm->init(); |
| } |
| m_invalid = false; |
| LOGFINEST("RemoteQueryService::init: done initialization"); |
| } |
| } |
| |
| std::shared_ptr<Query> RemoteQueryService::newQuery(std::string querystring) { |
| LOGDEBUG("RemoteQueryService::newQuery: multiuserMode = %d ", |
| m_tccdm->isMultiUserMode()); |
| if (!m_tccdm->isMultiUserMode()) { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException( |
| "QueryService::newQuery: Cache has been closed."); |
| } |
| |
| LOGDEBUG("RemoteQueryService: creating a new query: " + querystring); |
| return std::shared_ptr<Query>( |
| new RemoteQuery(querystring, shared_from_this(), m_tccdm)); |
| } else { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException( |
| "QueryService::newQuery: Cache has been closed."); |
| } |
| |
| LOGDEBUG("RemoteQueryService: creating a new query: " + querystring); |
| return std::shared_ptr<Query>(new RemoteQuery( |
| querystring, shared_from_this(), m_tccdm, |
| UserAttributes::threadLocalUserAttributes->getAuthenticatedView())); |
| } |
| } |
| |
| void RemoteQueryService::close() { |
| LOGFINEST("RemoteQueryService::close: starting close"); |
| TryWriteGuard guard(m_rwLock, m_invalid); |
| |
| if (m_cqService != nullptr) { |
| LOGFINEST("RemoteQueryService::close: starting CQ service close"); |
| m_cqService->closeCqService(); |
| m_cqService = nullptr; |
| LOGFINEST("RemoteQueryService::close: completed CQ service close"); |
| } |
| |
| if (dynamic_cast<ThinClientCacheDistributionManager*>(m_tccdm)) { |
| if (!m_invalid) { |
| LOGFINEST("RemoteQueryService::close: destroying DM"); |
| m_tccdm->destroy(); |
| } |
| _GEODE_SAFE_DELETE(m_tccdm); |
| m_invalid = true; |
| } |
| |
| if (!m_CqPoolsConnected.empty()) { |
| m_CqPoolsConnected.clear(); |
| } |
| |
| LOGFINEST("RemoteQueryService::close: completed"); |
| } |
| |
| /** |
| * execute all cqs on the endpoint after failover |
| */ |
| GfErrType RemoteQueryService::executeAllCqs(TcrEndpoint* endpoint) { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| LOGFINE("QueryService::executeAllCqs(endpoint): Not initialized."); |
| return GF_NOERR; |
| } |
| |
| if (m_cqService == nullptr) { |
| LOGFINE("RemoteQueryService: no cq to execute after failover to endpoint[" + |
| endpoint->name() + "]"); |
| return GF_NOERR; |
| } else { |
| LOGFINE("RemoteQueryService: execute all cqs after failover to endpoint[" + |
| endpoint->name() + "]"); |
| return m_cqService->executeAllClientCqs(endpoint); |
| } |
| } |
| |
| void RemoteQueryService::executeAllCqs(bool failover) { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| LOGFINE("QueryService::executeAllCqs: Not initialized."); |
| return; |
| } |
| |
| /*if cq has not been started, then failover will not start it.*/ |
| if (m_cqService != nullptr) { |
| LOGFINE("RemoteQueryService: execute all cqs after failover"); |
| m_cqService->executeAllClientCqs(failover); |
| } else { |
| LOGFINE("RemoteQueryService: no cq to execute after failover"); |
| } |
| } |
| |
| std::shared_ptr<CqQuery> RemoteQueryService::newCq( |
| std::string querystr, const std::shared_ptr<CqAttributes>& cqAttr, |
| bool isDurable) { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException("QueryService::newCq: Cache has been closed."); |
| } |
| initCqService(); |
| // use query string as name for now |
| std::string name("_default"); |
| name += querystr; |
| return m_cqService->newCq(name, querystr, cqAttr, isDurable); |
| } |
| |
| std::shared_ptr<CqQuery> RemoteQueryService::newCq( |
| std::string name, std::string querystr, |
| const std::shared_ptr<CqAttributes>& cqAttr, bool isDurable) { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException("QueryService::newCq: Cache has been closed."); |
| } |
| |
| initCqService(); |
| return m_cqService->newCq(name, querystr, cqAttr, isDurable); |
| } |
| |
| void RemoteQueryService::closeCqs() { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| LOGFINE("QueryService::closeCqs: Cache has been closed."); |
| return; |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService != nullptr) { |
| m_cqService->closeAllCqs(); |
| } |
| } |
| |
| CqService::query_container_type RemoteQueryService::getCqs() const { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException("QueryService::getCqs: Cache has been closed."); |
| } |
| |
| // If cqService has not started, then no cq exists |
| CqService::query_container_type vec; |
| if (m_cqService) { |
| vec = m_cqService->getAllCqs(); |
| } |
| |
| return vec; |
| } |
| |
| std::shared_ptr<CqQuery> RemoteQueryService::getCq( |
| const std::string& name) const { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException("QueryService::getCq: Cache has been closed."); |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService) { |
| return m_cqService->getCq(name); |
| } |
| |
| return nullptr; |
| } |
| |
| void RemoteQueryService::executeCqs() { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException( |
| "QueryService::executeCqs: Cache has been closed."); |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService != nullptr) { |
| m_cqService->executeAllClientCqs(); |
| } |
| } |
| |
| void RemoteQueryService::stopCqs() { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| LOGFINE("QueryService::stopCqs: Cache has been closed."); |
| return; |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService != nullptr) { |
| m_cqService->stopAllClientCqs(); |
| } |
| } |
| |
| std::shared_ptr<CqServiceStatistics> |
| RemoteQueryService::getCqServiceStatistics() const { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException( |
| "QueryService::getCqServiceStatistics: Cache has been closed."); |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService) { |
| return m_cqService->getCqServiceStatistics(); |
| } |
| |
| return nullptr; |
| } |
| |
| void RemoteQueryService::receiveNotification(TcrMessage* msg) { |
| { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| // do we need this check? |
| return; |
| } |
| /*if cq has not been started, then no cq exists */ |
| if (!m_cqService) { |
| return; |
| } |
| |
| if (!m_cqService->checkAndAcquireLock()) { |
| return; |
| } |
| } |
| |
| m_cqService->receiveNotification(msg); |
| } |
| |
| std::shared_ptr<CacheableArrayList> |
| RemoteQueryService::getAllDurableCqsFromServer() const { |
| TryReadGuard guard(m_rwLock, m_invalid); |
| |
| if (m_invalid) { |
| throw CacheClosedException( |
| "QueryService::getAllDurableCqsFromServer: Cache has been closed."); |
| } |
| |
| // If cqService has not started, then no cq exists |
| if (m_cqService) { |
| return m_cqService->getAllDurableCqsFromServer(); |
| } |
| |
| return nullptr; |
| } |
| |
| void RemoteQueryService::invokeCqConnectedListeners(ThinClientPoolDM* pool, |
| bool connected) { |
| if (!m_cqService) { |
| return; |
| } |
| |
| std::string poolName; |
| pool = dynamic_cast<ThinClientPoolDM*>(m_tccdm); |
| if (pool != nullptr) { |
| poolName = pool->getName(); |
| CqPoolsConnected::iterator itr = m_CqPoolsConnected.find(poolName); |
| if (itr != m_CqPoolsConnected.end() && itr->second == connected) { |
| LOGDEBUG("Returning since pools connection status matched."); |
| return; |
| } else { |
| LOGDEBUG("Inserting since pools connection status did not match."); |
| m_CqPoolsConnected[poolName] = connected; |
| } |
| } |
| m_cqService->invokeCqConnectedListeners(poolName, connected); |
| } |
| |
| } // namespace client |
| } // namespace geode |
| } // namespace apache |