blob: 791f13d89ef956ea6a027327436a24cb3acb45a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RemoteQueryService.hpp"
#include "CacheImpl.hpp"
#include "RemoteQuery.hpp"
#include "ReadWriteLock.hpp"
#include "CqServiceVsdStats.hpp"
#include "ThinClientPoolDM.hpp"
#include "UserAttributes.hpp"
using namespace apache::geode::client;
RemoteQueryService::RemoteQueryService(CacheImpl* cptr,
ThinClientPoolDM* poolDM)
: m_invalid(true), m_cqService(NULLPTR) {
if (poolDM) {
m_tccdm = poolDM;
} else {
m_tccdm =
new ThinClientCacheDistributionManager(cptr->tcrConnectionManager());
}
// m_cqService = new CqService(m_tccdm);
// m_tccdm->init();
LOGFINEST("Initialized m_tccdm");
}
void RemoteQueryService::init() {
TryWriteGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
LOGFINEST("RemoteQueryService::init: initializing TCCDM");
if (dynamic_cast<ThinClientCacheDistributionManager*>(m_tccdm)) {
m_tccdm->init();
}
m_invalid = false;
LOGFINEST("RemoteQueryService::init: done initialization");
}
}
QueryPtr RemoteQueryService::newQuery(const char* querystring) {
LOGDEBUG("RemoteQueryService::newQuery: multiuserMode = %d ",
m_tccdm->isMultiUserMode());
if (!m_tccdm->isMultiUserMode()) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException(
"QueryService::newQuery: Cache has been closed.");
}
LOGDEBUG("RemoteQueryService: creating a new query: %s", querystring);
return QueryPtr(
new RemoteQuery(querystring, RemoteQueryServicePtr(this), m_tccdm));
} else {
UserAttributesPtr ua =
TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes();
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException(
"QueryService::newQuery: Cache has been closed.");
}
LOGDEBUG("RemoteQueryService: creating a new query: %s", querystring);
return QueryPtr(new RemoteQuery(querystring, RemoteQueryServicePtr(this),
m_tccdm, ua->getProxyCache()));
}
}
void RemoteQueryService::close() {
LOGFINEST("RemoteQueryService::close: starting close");
TryWriteGuard guard(m_rwLock, m_invalid);
if (m_cqService != NULLPTR) {
LOGFINEST("RemoteQueryService::close: starting CQ service close");
m_cqService->closeCqService();
m_cqService = NULLPTR;
LOGFINEST("RemoteQueryService::close: completed CQ service close");
}
if (dynamic_cast<ThinClientCacheDistributionManager*>(m_tccdm)) {
if (!m_invalid) {
LOGFINEST("RemoteQueryService::close: destroying DM");
m_tccdm->destroy();
}
GF_SAFE_DELETE(m_tccdm);
m_invalid = true;
}
if (!m_CqPoolsConnected.empty()) {
m_CqPoolsConnected.clear();
}
LOGFINEST("RemoteQueryService::close: completed");
}
/**
* execute all cqs on the endpoint after failover
*/
GfErrType RemoteQueryService::executeAllCqs(TcrEndpoint* endpoint) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
LOGFINE("QueryService::executeAllCqs(endpoint): Not initialized.");
return GF_NOERR;
}
if (m_cqService == NULLPTR) {
LOGFINE(
"RemoteQueryService: no cq to execute after failover to endpoint[%s]",
endpoint->name().c_str());
return GF_NOERR;
} else {
LOGFINE(
"RemoteQueryService: execute all cqs after failover to endpoint[%s]",
endpoint->name().c_str());
return m_cqService->executeAllClientCqs(endpoint);
}
}
void RemoteQueryService::executeAllCqs(bool failover) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
LOGFINE("QueryService::executeAllCqs: Not initialized.");
return;
}
/*if cq has not been started, then failover will not start it.*/
if (m_cqService != NULLPTR) {
LOGFINE("RemoteQueryService: execute all cqs after failover");
m_cqService->executeAllClientCqs(failover);
} else {
LOGFINE("RemoteQueryService: no cq to execute after failover");
}
}
CqQueryPtr RemoteQueryService::newCq(const char* querystr,
CqAttributesPtr& cqAttr, bool isDurable) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException("QueryService::newCq: Cache has been closed.");
}
initCqService();
std::string qs(querystr);
// use query string as name for now
std::string name("_default");
name += querystr;
return m_cqService->newCq(name, qs, cqAttr, isDurable);
}
CqQueryPtr RemoteQueryService::newCq(const char* name, const char* querystr,
CqAttributesPtr& cqAttr, bool isDurable) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException("QueryService::newCq: Cache has been closed.");
}
initCqService();
std::string qs(querystr);
std::string nm(name);
return m_cqService->newCq(nm, qs, cqAttr, isDurable);
}
void RemoteQueryService::closeCqs() {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
LOGFINE("QueryService::closeCqs: Cache has been closed.");
return;
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
m_cqService->closeAllCqs();
}
}
void RemoteQueryService::getCqs(VectorOfCqQuery& vec) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException("QueryService::getCqs: Cache has been closed.");
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
m_cqService->getAllCqs(vec);
}
}
CqQueryPtr RemoteQueryService::getCq(const char* name) {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException("QueryService::getCq: Cache has been closed.");
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
std::string nm(name);
return m_cqService->getCq(nm);
}
return NULLPTR;
}
void RemoteQueryService::executeCqs() {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException(
"QueryService::executeCqs: Cache has been closed.");
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
m_cqService->executeAllClientCqs();
}
}
void RemoteQueryService::stopCqs() {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
LOGFINE("QueryService::stopCqs: Cache has been closed.");
return;
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
m_cqService->stopAllClientCqs();
}
}
CqServiceStatisticsPtr RemoteQueryService::getCqServiceStatistics() {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException(
"QueryService::getCqServiceStatistics: Cache has been closed.");
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
return m_cqService->getCqServiceStatistics();
}
CqServiceStatisticsPtr ptr(new CqServiceVsdStats());
return ptr;
}
void RemoteQueryService::receiveNotification(TcrMessage* msg) {
{
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
// do we need this check?
return;
}
/*if cq has not been started, then no cq exists */
if (m_cqService == NULLPTR) {
return;
}
if (!m_cqService->checkAndAcquireLock()) {
return;
}
}
m_cqService->receiveNotification(msg);
}
CacheableArrayListPtr RemoteQueryService::getAllDurableCqsFromServer() {
TryReadGuard guard(m_rwLock, m_invalid);
if (m_invalid) {
throw CacheClosedException(
"QueryService::getAllDurableCqsFromServer: Cache has been closed.");
}
// If cqService has not started, then no cq exists
if (m_cqService != NULLPTR) {
return m_cqService->getAllDurableCqsFromServer();
} else {
return NULLPTR;
}
}
void RemoteQueryService::invokeCqConnectedListeners(ThinClientPoolDM* pool,
bool connected) {
if (m_cqService == NULLPTR) {
return;
}
std::string poolName;
pool = dynamic_cast<ThinClientPoolDM*>(m_tccdm);
if (pool != NULL) {
poolName = pool->getName();
CqPoolsConnected::iterator itr = m_CqPoolsConnected.find(poolName);
if (itr != m_CqPoolsConnected.end() && itr->second == connected) {
LOGDEBUG("Returning since pools connection status matched.");
return;
} else {
LOGDEBUG("Inserting since pools connection status did not match.");
m_CqPoolsConnected[poolName] = connected;
}
}
m_cqService->invokeCqConnectedListeners(poolName, connected);
}