| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "CqService.hpp" |
| #include "ReadWriteLock.hpp" |
| #include <gfcpp/DistributedSystem.hpp> |
| #include <gfcpp/SystemProperties.hpp> |
| #include <gfcpp/ExceptionTypes.hpp> |
| #include "CqQueryImpl.hpp" |
| #include "CqEventImpl.hpp" |
| #include <gfcpp/CqServiceStatistics.hpp> |
| #include "ThinClientPoolDM.hpp" |
| #include <gfcpp/CqStatusListener.hpp> |
| using namespace apache::geode::client; |
| |
| CqService::CqService(ThinClientBaseDM* tccdm) |
| : m_tccdm(tccdm), m_notificationSema(1), m_stats(new CqServiceVsdStats()) { |
| m_cqQueryMap = new MapOfCqQueryWithLock(); |
| m_running = true; |
| LOGDEBUG("CqService Started"); |
| } |
| CqService::~CqService() { |
| if (m_cqQueryMap != NULL) delete m_cqQueryMap; |
| LOGDEBUG("CqService Destroyed"); |
| } |
| |
| void CqService::updateStats() { |
| CqServiceVsdStats* stats = dynamic_cast<CqServiceVsdStats*>(m_stats.ptr()); |
| |
| stats->setNumCqsActive(0); |
| stats->setNumCqsStopped(0); |
| |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| |
| stats->setNumCqsOnClient(static_cast<uint32_t>(m_cqQueryMap->current_size())); |
| |
| if (m_cqQueryMap->current_size() == 0) return; |
| |
| for (MapOfCqQueryWithLock::iterator q = m_cqQueryMap->begin(); |
| q != m_cqQueryMap->end(); ++q) { |
| CqQueryPtr cquery = ((*q).int_id_); |
| switch (cquery->getState()) { |
| case CqState::RUNNING: |
| stats->incNumCqsActive(); |
| break; |
| case CqState::STOPPED: |
| stats->incNumCqsStopped(); |
| break; |
| default: |
| break; |
| } |
| } |
| } |
| |
| bool CqService::checkAndAcquireLock() { |
| if (m_running) { |
| m_notificationSema.acquire(); |
| if (m_running == false) { |
| m_notificationSema.release(); |
| return false; |
| } |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| CqQueryPtr CqService::newCq(std::string& cqName, std::string& queryString, |
| CqAttributesPtr& cqAttributes, bool isDurable) { |
| if (queryString.empty()) { |
| throw IllegalArgumentException("Null queryString is passed. "); |
| } else if (cqAttributes == NULLPTR) { |
| throw IllegalArgumentException("Null cqAttribute is passed. "); |
| } |
| |
| // Check if the subscription is enabled on the pool |
| ThinClientPoolDM* pool = dynamic_cast<ThinClientPoolDM*>(m_tccdm); |
| if (pool != NULL && !pool->getSubscriptionEnabled()) { |
| LOGERROR( |
| "Cannot create CQ because subscription is not enabled on the pool."); |
| throw IllegalStateException( |
| "Cannot create CQ because subscription is not enabled on the pool."); |
| } |
| |
| // check for durable client |
| if (isDurable) { |
| SystemProperties* sysProps = DistributedSystem::getSystemProperties(); |
| const char* durableID = |
| (sysProps != NULL) ? sysProps->durableClientId() : NULL; |
| if (durableID == NULL || strlen(durableID) == 0) { |
| LOGERROR("Cannot create durable CQ because client is not durable."); |
| throw IllegalStateException( |
| "Cannot create durable CQ because client is not durable."); |
| } |
| } |
| |
| // Check if the given cq already exists. |
| if (!cqName.empty() && isCqExists(cqName)) { |
| throw CqExistsException( |
| ("CQ with the given name already exists. CqName : " + cqName).c_str()); |
| } |
| |
| UserAttributesPtr ua; |
| ua = NULLPTR; |
| if (m_tccdm != NULL && m_tccdm->isMultiUserMode()) { |
| ua = |
| TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes(); |
| } |
| |
| CqServicePtr cqs(this); |
| CqQueryImpl* cQuery = |
| new CqQueryImpl(cqs, cqName, queryString, cqAttributes, isDurable, ua); |
| cQuery->initCq(); |
| CqQueryPtr ptr(cQuery); |
| return ptr; |
| } |
| |
| /** |
| * Adds the given CQ and cqQuery object into the CQ map. |
| */ |
| void CqService::addCq(std::string& cqName, CqQueryPtr& cq) { |
| try { |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| CqQueryPtr tmp; |
| if (0 == m_cqQueryMap->find(cqName, tmp)) { |
| throw CqExistsException("CQ with given name already exists. "); |
| } |
| m_cqQueryMap->bind(cqName, cq); |
| } catch (Exception& e) { |
| throw e; |
| } |
| } |
| |
| /** |
| * Removes given CQ from the cqMap.. |
| */ |
| void CqService::removeCq(std::string& cqName) { |
| try { |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| m_cqQueryMap->unbind(cqName); |
| } catch (Exception& e) { |
| throw e; |
| } |
| } |
| |
| /** |
| * Retrieve a CqQuery by name. |
| * @return the CqQuery or null if not found |
| */ |
| CqQueryPtr CqService::getCq(std::string& cqName) { |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| CqQueryPtr tmp; |
| if (0 != m_cqQueryMap->find(cqName, tmp)) { |
| LOGWARN("Failed to get the specified CQ: %s", cqName.c_str()); |
| } else { |
| return tmp; |
| } |
| return NULLPTR; |
| } |
| |
| /** |
| * Clears the CQ Query Map. |
| */ |
| void CqService::clearCqQueryMap() { |
| Log::fine("Cleaning clearCqQueryMap."); |
| try { |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| m_cqQueryMap->unbind_all(); |
| } catch (Exception& e) { |
| throw e; |
| } |
| } |
| |
| /** |
| * Retrieve all registered CQs |
| */ |
| void CqService::getAllCqs(VectorOfCqQuery& cqVec) { |
| cqVec.clear(); |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| if (m_cqQueryMap->current_size() == 0) return; |
| cqVec.reserve(static_cast<int32_t>(m_cqQueryMap->current_size())); |
| for (MapOfCqQueryWithLock::iterator q = m_cqQueryMap->begin(); |
| q != m_cqQueryMap->end(); ++q) { |
| cqVec.push_back((*q).int_id_); |
| } |
| } |
| |
| /** |
| * Executes all the cqs on this client. |
| */ |
| void CqService::executeAllClientCqs(bool afterFailover) { |
| // ACE_Guard< ACE_Recursive_Thread_Mutex > _guard( m_mutex ); |
| VectorOfCqQuery cqVec; |
| getAllCqs(cqVec); |
| // MapOfRegionGuard guard( m_cqQueryMap->mutex() ); |
| executeCqs(cqVec, afterFailover); |
| } |
| |
| /** |
| * Executes all CQs on the specified endpoint after failover. |
| */ |
| GfErrType CqService::executeAllClientCqs(TcrEndpoint* endpoint) { |
| VectorOfCqQuery cqVec; |
| getAllCqs(cqVec); |
| return executeCqs(cqVec, endpoint); |
| } |
| |
| /** |
| * Executes all the given cqs on the specified endpoint after failover. |
| */ |
| GfErrType CqService::executeCqs(VectorOfCqQuery& cqs, TcrEndpoint* endpoint) { |
| if (cqs.empty()) { |
| return GF_NOERR; |
| } |
| |
| GfErrType err = GF_NOERR; |
| GfErrType opErr = GF_NOERR; |
| |
| for (int32_t cqCnt = 0; cqCnt < cqs.length(); cqCnt++) { |
| CqQueryPtr cq = cqs[cqCnt]; |
| CqQueryImpl* cQueryImpl = static_cast<CqQueryImpl*>(cq.ptr()); |
| if (!cq->isClosed() && cq->isRunning()) { |
| opErr = cQueryImpl->execute(endpoint); |
| if (err == GF_NOERR) { |
| err = opErr; |
| } |
| } |
| } |
| return err; |
| } |
| |
| /** |
| * Executes all the given cqs. |
| */ |
| void CqService::executeCqs(VectorOfCqQuery& cqs, bool afterFailover) { |
| if (cqs.empty()) { |
| return; |
| } |
| std::string cqName; |
| for (int32_t cqCnt = 0; cqCnt < cqs.length(); cqCnt++) { |
| CqQueryPtr cq = cqs[cqCnt]; |
| CqQueryImpl* cQueryImpl = dynamic_cast<CqQueryImpl*>(cq.ptr()); |
| if (!cq->isClosed() && |
| (cq->isStopped() || (cq->isRunning() && afterFailover))) { |
| try { |
| cqName = cq->getName(); |
| if (afterFailover) { |
| cQueryImpl->executeAfterFailover(); |
| } else { |
| cq->execute(); |
| } |
| } catch (QueryException& qe) { |
| LOGFINE("%s", |
| ("Failed to execute the CQ, CqName : " + cqName + " Error : " + |
| qe.getMessage()) |
| .c_str()); |
| } catch (CqClosedException& cce) { |
| LOGFINE(("Failed to execute the CQ, CqName : " + cqName + " Error : " + |
| cce.getMessage()) |
| .c_str()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Stops all the cqs |
| */ |
| void CqService::stopAllClientCqs() { |
| VectorOfCqQuery cqVec; |
| getAllCqs(cqVec); |
| // MapOfRegionGuard guard( m_cqQueryMap->mutex() ); |
| stopCqs(cqVec); |
| } |
| |
| /** |
| * Stops all the specified cqs. |
| */ |
| void CqService::stopCqs(VectorOfCqQuery& cqs) { |
| if (cqs.empty()) { |
| return; |
| } |
| |
| std::string cqName; |
| for (int32_t cqCnt = 0; cqCnt < cqs.length(); cqCnt++) { |
| CqQueryPtr cq = cqs[cqCnt]; |
| if (!cq->isClosed() && cq->isRunning()) { |
| try { |
| cqName = cq->getName(); |
| cq->stop(); |
| } catch (QueryException& qe) { |
| Log::fine(("Failed to stop the CQ, CqName : " + cqName + " Error : " + |
| qe.getMessage()) |
| .c_str()); |
| } catch (CqClosedException& cce) { |
| Log::fine(("Failed to stop the CQ, CqName : " + cqName + " Error : " + |
| cce.getMessage()) |
| .c_str()); |
| } |
| } |
| } |
| } |
| |
| void CqService::closeCqs(VectorOfCqQuery& cqs) { |
| LOGDEBUG("closeCqs() TcrMessage::isKeepAlive() = %d ", |
| TcrMessage::isKeepAlive()); |
| if (!cqs.empty()) { |
| std::string cqName; |
| for (int32_t cqCnt = 0; cqCnt < cqs.length(); cqCnt++) { |
| try { |
| CqQueryImpl* cq = dynamic_cast<CqQueryImpl*>(cqs[cqCnt].ptr()); |
| cqName = cq->getName(); |
| LOGDEBUG("closeCqs() cqname = %s isDurable = %d ", cqName.c_str(), |
| cq->isDurable()); |
| if (!(cq->isDurable() && TcrMessage::isKeepAlive())) { |
| cq->close(true); |
| } else { |
| cq->close(false); |
| } |
| } catch (QueryException& qe) { |
| Log::fine(("Failed to close the CQ, CqName : " + cqName + " Error : " + |
| qe.getMessage()) |
| .c_str()); |
| } catch (CqClosedException& cce) { |
| Log::fine(("Failed to close the CQ, CqName : " + cqName + " Error : " + |
| cce.getMessage()) |
| .c_str()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Get statistics information for all CQs |
| * @return the CqServiceStatistics |
| */ |
| CqServiceStatisticsPtr CqService::getCqServiceStatistics() { return m_stats; } |
| |
| /** |
| * Close the CQ Service after cleanup if any. |
| * |
| */ |
| void CqService::closeCqService() { |
| if (m_running) { |
| m_running = false; |
| m_notificationSema.acquire(); |
| cleanup(); |
| m_notificationSema.release(); |
| } |
| } |
| void CqService::closeAllCqs() { |
| Log::fine("closeAllCqs()"); |
| VectorOfCqQuery cqVec; |
| getAllCqs(cqVec); |
| Log::fine("closeAllCqs() 1"); |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| Log::fine("closeAllCqs() 2"); |
| closeCqs(cqVec); |
| } |
| |
| /** |
| * Cleans up the CqService. |
| */ |
| void CqService::cleanup() { |
| Log::fine("Cleaning up CqService."); |
| |
| // Close All the CQs. |
| // Need to take care when Clients are still connected... |
| closeAllCqs(); |
| |
| // Clear cqQueryMap. |
| clearCqQueryMap(); |
| } |
| |
| /* |
| * Checks if CQ with the given name already exists. |
| * @param cqName name of the CQ. |
| * @return true if exists else false. |
| */ |
| bool CqService::isCqExists(std::string& cqName) { |
| bool status = false; |
| try { |
| MapOfRegionGuard guard(m_cqQueryMap->mutex()); |
| CqQueryPtr tmp; |
| status = (0 == m_cqQueryMap->find(cqName, tmp)); |
| } catch (Exception& ex) { |
| LOGFINE("Exception (%s) in isCQExists, ignored ", |
| ex.getMessage()); // Ignore. |
| } |
| return status; |
| } |
| void CqService::receiveNotification(TcrMessage* msg) { |
| invokeCqListeners(msg->getCqs(), msg->getMessageTypeForCq(), msg->getKey(), |
| msg->getValue(), msg->getDeltaBytes(), msg->getEventId()); |
| GF_SAFE_DELETE(msg); |
| m_notificationSema.release(); |
| } |
| |
| /** |
| * Invokes the CqListeners for the given CQs. |
| * @param cqs list of cqs with the cq operation from the Server. |
| * @param messageType base operation |
| * @param key |
| * @param value |
| */ |
| void CqService::invokeCqListeners(const std::map<std::string, int>* cqs, |
| uint32_t messageType, CacheableKeyPtr key, |
| CacheablePtr value, |
| CacheableBytesPtr deltaValue, |
| EventIdPtr eventId) { |
| LOGDEBUG("CqService::invokeCqListeners"); |
| CqQueryPtr cQuery; |
| CqQueryImpl* cQueryImpl; |
| for (std::map<std::string, int>::const_iterator iter = cqs->begin(); |
| iter != cqs->end(); ++iter) { |
| std::string cqName = iter->first; |
| cQuery = getCq(cqName); |
| cQueryImpl = dynamic_cast<CqQueryImpl*>(cQuery.ptr()); |
| if (cQueryImpl == NULL || !cQueryImpl->isRunning()) { |
| LOGFINE("Unable to invoke CqListener, %s, CqName: %s", |
| (cQueryImpl == NULL) ? "CQ not found" : "CQ is Not running", |
| cqName.c_str()); |
| continue; |
| } |
| |
| int cqOp = iter->second; |
| |
| // If Region destroy event, close the cq. |
| if (cqOp == TcrMessage::DESTROY_REGION) { |
| // The close will also invoke the listeners close(). |
| try { |
| cQueryImpl->close(false); |
| } catch (Exception& ex) { |
| // handle? |
| LOGFINE("Exception while invoking CQ listeners: %s", ex.getMessage()); |
| } |
| continue; |
| } |
| |
| // Construct CqEvent. |
| CqEventImpl* cqEvent = |
| new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp), |
| key, value, m_tccdm, deltaValue, eventId); |
| |
| // Update statistics |
| cQueryImpl->updateStats(*cqEvent); |
| |
| // invoke CQ Listeners. |
| VectorOfCqListener cqListeners; |
| cQueryImpl->getCqAttributes()->getCqListeners(cqListeners); |
| /* |
| Log::fine(("Invoking CqListeners for the CQ, CqName : " + cqName + |
| " , Number of Listeners : " + cqListeners.length() + " cqEvent : " + |
| cqEvent); |
| */ |
| |
| for (int32_t lCnt = 0; lCnt < cqListeners.length(); lCnt++) { |
| try { |
| // Check if the listener is not null, it could have been changed/reset |
| // by the CqAttributeMutator. |
| if (cqListeners[lCnt] != NULLPTR) { |
| if (cqEvent->getError() == true) { |
| cqListeners[lCnt]->onError(*cqEvent); |
| } else { |
| cqListeners[lCnt]->onEvent(*cqEvent); |
| } |
| } |
| // Handle client side exceptions. |
| } catch (Exception& ex) { |
| LOGWARN(("Exception in the CqListener of the CQ named " + cqName + |
| ", error: " + ex.getMessage()) |
| .c_str()); |
| } |
| } |
| delete cqEvent; |
| } |
| } |
| |
| void CqService::invokeCqConnectedListeners(std::string poolName, |
| bool connected) { |
| CqQueryPtr cQuery; |
| CqQueryImpl* cQueryImpl; |
| VectorOfCqQuery vec; |
| getAllCqs(vec); |
| for (int32_t i = 0; i < vec.size(); i++) { |
| std::string cqName = vec.at(i)->getName(); |
| cQuery = getCq(cqName); |
| cQueryImpl = dynamic_cast<CqQueryImpl*>(cQuery.ptr()); |
| if (cQueryImpl == NULL || !cQueryImpl->isRunning()) { |
| LOGFINE("Unable to invoke CqStatusListener, %s, CqName: %s", |
| (cQueryImpl == NULL) ? "CQ not found" : "CQ is Not running", |
| cqName.c_str()); |
| continue; |
| } |
| |
| // Check cq pool to determine if the pool matches, if not continue. |
| ThinClientPoolDM* poolDM = |
| dynamic_cast<ThinClientPoolDM*>(cQueryImpl->getDM()); |
| if (poolDM != NULL) { |
| std::string pName = poolDM->getName(); |
| if (pName.compare(poolName) != 0) { |
| continue; |
| } |
| } |
| |
| // invoke CQ Listeners. |
| VectorOfCqListener cqListeners; |
| cQueryImpl->getCqAttributes()->getCqListeners(cqListeners); |
| for (int32_t lCnt = 0; lCnt < cqListeners.length(); lCnt++) { |
| try { |
| // Check if the listener is not null, it could have been changed/reset |
| // by the CqAttributeMutator. |
| CqStatusListenerPtr statusLstr = NULLPTR; |
| try { |
| statusLstr = dynCast<CqStatusListenerPtr>(cqListeners[lCnt]); |
| } catch (const ClassCastException&) { |
| // ignore |
| } |
| if (statusLstr != NULLPTR) { |
| if (connected) { |
| statusLstr->onCqConnected(); |
| } else { |
| statusLstr->onCqDisconnected(); |
| } |
| } |
| // Handle client side exceptions. |
| } catch (Exception& ex) { |
| LOGWARN(("Exception in the CqStatusListener of the CQ named " + cqName + |
| ", error: " + ex.getMessage()) |
| .c_str()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the Operation for the given EnumListenerEvent type. |
| * @param eventType |
| * @return Operation |
| */ |
| CqOperation::CqOperationType CqService::getOperation(int eventType) { |
| CqOperation::CqOperationType op = CqOperation::OP_TYPE_INVALID; |
| switch (eventType) { |
| case TcrMessage::LOCAL_CREATE: |
| op = CqOperation::OP_TYPE_CREATE; |
| break; |
| |
| case TcrMessage::LOCAL_UPDATE: |
| op = CqOperation::OP_TYPE_UPDATE; |
| break; |
| |
| case TcrMessage::LOCAL_DESTROY: |
| op = CqOperation::OP_TYPE_DESTROY; |
| break; |
| |
| case TcrMessage::LOCAL_INVALIDATE: |
| op = CqOperation::OP_TYPE_INVALIDATE; |
| break; |
| |
| case TcrMessage::CLEAR_REGION: |
| op = CqOperation::OP_TYPE_REGION_CLEAR; |
| break; |
| |
| // case TcrMessage::INVALIDATE_REGION : |
| // op = CqOperation::OP_TYPE_REGION_INVALIDATE; |
| // break; |
| } |
| return op; |
| } |
| |
| /** |
| * Gets all the durable CQs registered by this client. |
| * |
| * @return List of names of registered durable CQs, empty list if no durable |
| * cqs. |
| */ |
| CacheableArrayListPtr CqService::getAllDurableCqsFromServer() { |
| TcrMessageGetDurableCqs msg(m_tccdm); |
| TcrMessageReply reply(true, m_tccdm); |
| |
| // intialize the chunked response hadler for durable cqs list |
| ChunkedDurableCQListResponse* resultCollector = |
| new ChunkedDurableCQListResponse(reply); |
| reply.setChunkedResultHandler( |
| static_cast<TcrChunkedResult*>(resultCollector)); |
| reply.setTimeout(DEFAULT_QUERY_RESPONSE_TIMEOUT); |
| |
| GfErrType err = GF_NOERR; |
| err = m_tccdm->sendSyncRequest(msg, reply); |
| if (err != GF_NOERR) { |
| LOGDEBUG("CqService::getAllDurableCqsFromServer!!!!"); |
| GfErrTypeToException("CqService::getAllDurableCqsFromServer:", err); |
| } |
| if (reply.getMessageType() == TcrMessage::EXCEPTION || |
| reply.getMessageType() == TcrMessage::GET_DURABLE_CQS_DATA_ERROR) { |
| err = ThinClientRegion::handleServerException( |
| "CqService::getAllDurableCqsFromServer", reply.getException()); |
| if (err == GF_CACHESERVER_EXCEPTION) { |
| throw CqQueryException( |
| "CqService::getAllDurableCqsFromServer: exception " |
| "at the server side: ", |
| reply.getException()); |
| } else { |
| GfErrTypeToException("CqService::getAllDurableCqsFromServer", err); |
| } |
| } |
| |
| CacheableArrayListPtr tmpRes = resultCollector->getResults(); |
| delete resultCollector; |
| return tmpRes; |
| } |