blob: 381179c58c481c55906eb3076fcc237d95c045a6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ProxyRemoteQueryService.hpp"
#include <geode/PoolManager.hpp>
#include "CqQueryImpl.hpp"
#include "ThinClientPoolDM.hpp"
namespace apache {
namespace geode {
namespace client {
ProxyRemoteQueryService::ProxyRemoteQueryService(AuthenticatedView* cptr)
: m_authenticatedView(cptr) {}
std::shared_ptr<Query> ProxyRemoteQueryService::newQuery(
std::string querystring) {
if (!m_authenticatedView->isClosed()) {
auto userAttachedPool = m_authenticatedView->m_userAttributes->getPool();
auto pool =
m_authenticatedView->m_cacheImpl->getCache()->getPoolManager().find(
userAttachedPool->getName());
if (pool != nullptr && pool.get() == userAttachedPool.get() &&
!pool->isDestroyed()) {
GuardUserAttributes gua(m_authenticatedView);
auto poolDM = std::static_pointer_cast<ThinClientPoolDM>(pool);
if (!poolDM->isDestroyed()) {
return poolDM->getQueryServiceWithoutCheck()->newQuery(querystring);
}
}
throw IllegalStateException("Pool has been closed.");
}
throw IllegalStateException("UserCache has been closed.");
}
void ProxyRemoteQueryService::unSupportedException(
const std::string& operationName) {
throw UnsupportedOperationException(operationName +
"operation is not supported when pool is "
"in multiuser authentication mode.");
}
std::shared_ptr<CqQuery> ProxyRemoteQueryService::newCq(
std::string querystr, const std::shared_ptr<CqAttributes>& cqAttr,
bool isDurable) {
if (!m_authenticatedView->isClosed()) {
auto userAttachedPool = m_authenticatedView->m_userAttributes->getPool();
auto pool =
m_authenticatedView->m_cacheImpl->getCache()->getPoolManager().find(
userAttachedPool->getName());
if (pool != nullptr && pool.get() == userAttachedPool.get() &&
!pool->isDestroyed()) {
GuardUserAttributes gua(m_authenticatedView);
auto pooDM = std::static_pointer_cast<ThinClientPoolDM>(pool);
if (!pooDM->isDestroyed()) {
auto cqQuery = pooDM->getQueryServiceWithoutCheck()->newCq(
querystr, cqAttr, isDurable);
addCqQuery(cqQuery);
return cqQuery;
}
}
throw IllegalStateException("Pool has been closed.");
}
throw IllegalStateException("Logical Cache has been closed.");
}
void ProxyRemoteQueryService::addCqQuery(
const std::shared_ptr<CqQuery>& cqQuery) {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_cqQueryListLock);
m_cqQueries.push_back(cqQuery);
}
std::shared_ptr<CqQuery> ProxyRemoteQueryService::newCq(
std::string name, std::string querystr,
const std::shared_ptr<CqAttributes>& cqAttr, bool isDurable) {
if (!m_authenticatedView->isClosed()) {
auto userAttachedPool = m_authenticatedView->m_userAttributes->getPool();
auto pool =
m_authenticatedView->m_cacheImpl->getCache()->getPoolManager().find(
userAttachedPool->getName());
if (pool != nullptr && pool.get() == userAttachedPool.get() &&
!pool->isDestroyed()) {
GuardUserAttributes gua(m_authenticatedView);
auto poolDM = std::static_pointer_cast<ThinClientPoolDM>(pool);
if (!poolDM->isDestroyed()) {
auto cqQuery = poolDM->getQueryServiceWithoutCheck()->newCq(
name, querystr, cqAttr, isDurable);
addCqQuery(cqQuery);
return cqQuery;
}
}
throw IllegalStateException("Pool has been closed.");
}
throw IllegalStateException("Logical Cache has been closed.");
}
void ProxyRemoteQueryService::closeCqs() { closeCqs(false); }
void ProxyRemoteQueryService::closeCqs(bool keepAlive) {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_cqQueryListLock);
for (auto&& q : m_cqQueries) {
try {
if (!(q->isDurable() && keepAlive)) {
q->close();
} else {
// need to just cleanup client side data structure
auto&& cqImpl = std::static_pointer_cast<CqQueryImpl>(q);
cqImpl->close(false);
}
} catch (QueryException& qe) {
LOGFINE("Failed to close the CQ, CqName : " + q->getName() +
" Error : " + qe.getMessage());
} catch (CqClosedException& cce) {
LOGFINE("Failed to close the CQ, CqName : " + q->getName() +
" Error : " + cce.getMessage());
}
}
}
QueryService::query_container_type ProxyRemoteQueryService::getCqs() const {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_cqQueryListLock);
return m_cqQueries;
}
std::shared_ptr<CqQuery> ProxyRemoteQueryService::getCq(
const std::string& name) const {
if (!m_authenticatedView->isClosed()) {
auto userAttachedPool = m_authenticatedView->m_userAttributes->getPool();
auto pool =
m_authenticatedView->m_cacheImpl->getCache()->getPoolManager().find(
userAttachedPool->getName());
if (pool != nullptr && pool.get() == userAttachedPool.get() &&
!pool->isDestroyed()) {
GuardUserAttributes gua(m_authenticatedView);
auto poolDM = std::static_pointer_cast<ThinClientPoolDM>(pool);
if (!poolDM->isDestroyed()) {
return poolDM->getQueryServiceWithoutCheck()->getCq(name);
}
}
throw IllegalStateException("Pool has been closed.");
}
throw IllegalStateException("Logical Cache has been closed.");
}
void ProxyRemoteQueryService::executeCqs() {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_cqQueryListLock);
for (auto&& q : m_cqQueries) {
try {
q->execute();
} catch (QueryException& qe) {
LOGFINE("Failed to execute the CQ, CqName : " + q->getName() +
" Error : " + qe.getMessage());
} catch (CqClosedException& cce) {
LOGFINE("Failed to execute the CQ, CqName : " + q->getName() +
" Error : " + cce.getMessage());
}
}
}
void ProxyRemoteQueryService::stopCqs() {
ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_cqQueryListLock);
for (auto&& q : m_cqQueries) {
try {
q->stop();
} catch (QueryException& qe) {
LOGFINE("Failed to stop the CQ, CqName : " + q->getName() +
" Error : " + qe.getMessage());
} catch (CqClosedException& cce) {
LOGFINE("Failed to stop the CQ, CqName : " + q->getName() +
" Error : " + cce.getMessage());
}
}
}
std::shared_ptr<CqServiceStatistics>
ProxyRemoteQueryService::getCqServiceStatistics() const {
unSupportedException("getCqServiceStatistics()");
}
std::shared_ptr<CacheableArrayList>
ProxyRemoteQueryService::getAllDurableCqsFromServer() const {
unSupportedException("getAllDurableCqsFromServer()");
}
} // namespace client
} // namespace geode
} // namespace apache