blob: a1fef0bb22deead8d5fd8aff48e5fe923d4f078f [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
#include "../VectorT.hpp"
#include "CqQueryImpl.hpp"
#include "../CqAttributesFactory.hpp"
#include "CqAttributesMutatorImpl.hpp"
#include "../Log.hpp"
#include "ResultSetImpl.hpp"
#include "StructSetImpl.hpp"
#include "../ExceptionTypes.hpp"
#include "ThinClientRegion.hpp"
#include "ReadWriteLock.hpp"
#include "ThinClientRegion.hpp"
using namespace gemfire;
CqQueryImpl::CqQueryImpl(CqServicePtr& cqService, std::string& cqName, std::string& queryString, CqAttributesPtr& cqAttributes, bool isDurable, UserAttributesPtr userAttributesPtr) :
m_stats(new CqQueryVsdStats(m_cqName.c_str())),
m_cqName(cqName),
m_queryString(queryString),
m_cqService(cqService),
m_cqState(CqState::STOPPED), //Initial state is stopped
m_serverCqName(cqName), // On Client Side serverCqName and cqName will be same.
m_tccdm(m_cqService->getDM()),
m_isDurable(isDurable),
/* adongre
* CID 28930: Uninitialized scalar field (UNINIT_CTOR)
*/
m_cqOperation(CqOperation::OP_TYPE_INVALID)
{
CqAttributesFactory cqAf(cqAttributes);
m_cqAttributes = cqAf.create();
m_cqAttributesMutator = new CqAttributesMutatorImpl(CqAttributesPtr(m_cqAttributes));
if (userAttributesPtr != NULLPTR)
m_proxyCache = userAttributesPtr->getProxyCache();
else
m_proxyCache = NULLPTR;
}
CqQueryImpl::~CqQueryImpl()
{
}
/**
* returns CQ name
*/
void CqQueryImpl::updateStats()
{
m_cqService->updateStats();
}
const char* CqQueryImpl::getName() const{
return m_cqName.c_str();
}
/**
* sets the CqName.
*/
void CqQueryImpl::setName(std::string& cqName) {
m_cqName = m_serverCqName = cqName;
}
/**
* Initializes the CqQuery.
* creates Query object, if its valid adds into repository.
*/
void CqQueryImpl::initCq() {
addToCqMap();
// Initialize the VSD statistics
// Update statistics with CQ creation.
CqServiceVsdStats & stats = m_cqService->getCqServiceVsdStats();
//stats.incNumCqsStopped();
stats.incNumCqsCreated();
//stats.incNumCqsOnClient();
updateStats();
}
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* On Server side, takes care of repository cleanup.
* @throws CqException
*/
void CqQueryImpl::close(){
close(true);
}
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* @param sendRequestToServer true to send the request to server.
* @throws CqException
*/
void CqQueryImpl::close(bool sendRequestToServer) {
// Check if the cq is already closed.
if (isClosed()) {
//throw new CqClosedException("CQ is already closed, CqName : " + this.cqName);
LOGFINE("CQ is already closed, CqName : %s" , m_cqName.c_str());
return;
}
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
LOGFINE("Started closing CQ CqName : %s", m_cqName.c_str());
//bool isClosed = false;
// Stat update.
CqServiceVsdStats & stats = m_cqService->getCqServiceVsdStats();
/*
if (isRunning()) {
stats.decNumCqsActive();
}
else if (isStopped()) {
stats.decNumCqsStopped();
}
*/
setCqState(CqState::CLOSING);
if( sendRequestToServer == true) {
try {
sendStopOrClose(TcrMessage::CLOSECQ_MSG_TYPE);
} catch (...) {
// ignore and move on
}
}
// Set the state to close, and update stats
setCqState(CqState::CLOSED);
stats.incNumCqsClosed();
// Invoke close on Listeners if any.
if (m_cqAttributes != NULLPTR) {
VectorOfCqListener cqListeners ;
m_cqAttributes->getCqListeners(cqListeners);
if (!cqListeners.empty()) {
LOGFINE("Invoking CqListeners close() api for the CQ, CqName : %s Number of CqListeners : %d" , m_cqName.c_str() , cqListeners.length());
for (int32_t lCnt=0; lCnt < cqListeners.length(); lCnt++) {
try {
cqListeners[lCnt]->close();
// Handle client side exceptions.
} catch (Exception& ex) {
LOGWARN("Exception occoured in the CqListener of the CQ, CqName : %sError : %s" , m_cqName.c_str() , ex.getMessage());
}
}
}
}
removeFromCqMap();
updateStats();
LOGFINE("Successfully closed the CQ. %s" , m_cqName.c_str());
}
/**
* Store this CQ in the cqService's cqMap.
* @throws CqException
*/
void CqQueryImpl::addToCqMap() {
// Add CQ to the CQ repository
try {
LOGFINE("Adding to CQ Repository. CqName : %s Server CqName : %s", m_cqName.c_str(), m_serverCqName.c_str());
CqQueryPtr cq(this);
m_cqService->addCq(m_cqName, cq);
} catch (Exception& ex){
std::string errMsg = "Failed to store Continuous Query in the repository. CqName: " + m_cqName + ex.getMessage();
LOGERROR(errMsg.c_str());
throw CqException (errMsg.c_str());
}
LOGFINE("Stored CQ in the CQ repository. %s", m_cqName.c_str());
}
/**
* Removes the CQ from CQ repository.
* @throws CqException
*/
void CqQueryImpl::removeFromCqMap() {
try {
m_cqService->removeCq(m_cqName);
} catch (Exception& ex){
std::string errMsg = "Failed to remove Continuous Query From the repository. CqName: " + m_cqName + " Error : " + ex.getMessage();
LOGERROR(errMsg.c_str());
throw CqException (errMsg.c_str());
}
LOGFINE("Removed CQ from the CQ repository. CQ Name: %s", m_cqName.c_str());
}
/**
* Returns the QueryString of this CQ.
*/
const char * CqQueryImpl::getQueryString() const{
return m_queryString.c_str();
}
/**
* Return the query
* @return the Query for the query string
*/
QueryPtr CqQueryImpl::getQuery() const{
return m_query;
}
/**
* @see com.gemstone.gemfire.cache.query.CqQuery#getStatistics()
*/
const CqStatisticsPtr CqQueryImpl::getStatistics() const {
return m_stats;
}
const CqAttributesPtr CqQueryImpl::getCqAttributes() const {
return m_cqAttributes;
}
/**
* Clears the resource used by CQ.
* @throws CqException
*/
void CqQueryImpl::cleanup() {
removeFromCqMap();
}
/**
* @return Returns the cqListeners.
*/
void CqQueryImpl::getCqListeners(VectorOfCqListener& cqListener) {
m_cqAttributes->getCqListeners(cqListener);
}
GfErrType CqQueryImpl::execute(TcrEndpoint * endpoint)
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
if (m_cqState != CqState::RUNNING) {
return GF_NOERR;
}
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
LOGFINE("Executing CQ [%s]", m_cqName.c_str());
TcrMessage request(TcrMessage::EXECUTECQ_MSG_TYPE, m_cqName, m_queryString,
CqState::RUNNING, isDurable(), m_tccdm);
TcrMessage reply(true, m_tccdm);
GfErrType err = GF_NOERR;
err = m_tccdm->sendRequestToEP(request, reply, endpoint);
if ( err != GF_NOERR ) {
//GfErrTypeToException("CqQuery::execute(endpoint)", err);
return err;
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
err = ThinClientRegion::handleServerException("CqQuery::execute(endpoint)",
reply.getException());
/*
if (err == GF_CACHESERVER_EXCEPTION) {
throw CqQueryException("CqQuery::execute(endpoint): exception at the server side: ",
reply.getException());
}
else {
GfErrTypeToException("CqQuery::execute(endpoint)", err);
}
*/
}
return err;
}
void CqQueryImpl::executeAfterFailover()
{
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
if (m_cqState != CqState::RUNNING) {
return;
}
executeCq(TcrMessage::EXECUTECQ_MSG_TYPE);
}
void CqQueryImpl::execute()
{
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
ACE_Guard<ACE_Recursive_Thread_Mutex> guardRedundancy( *( m_tccdm->getRedundancyLock( ) ) );
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
if(m_cqState == CqState::RUNNING)
{
throw IllegalStateException("CqQuery::execute: cq is already running");
}
executeCq(TcrMessage::EXECUTECQ_MSG_TYPE);
}
//for EXECUTE_REQUEST or REDUNDANT_EXECUTE_REQUEST
bool CqQueryImpl::executeCq(TcrMessage::MsgType requestType)
{
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
LOGDEBUG("CqQueryImpl::executeCq");
TcrMessage msg(requestType, m_cqName, m_queryString, CqState::RUNNING, isDurable(), m_tccdm);
TcrMessage reply(true, m_tccdm);
GfErrType err = GF_NOERR;
err = m_tccdm->sendSyncRequest(msg, reply);
if ( err != GF_NOERR ) {
GfErrTypeToException("CqQuery::executeCq:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
err = ThinClientRegion::handleServerException("CqQuery::executeCq",
reply.getException());
if (err == GF_CACHESERVER_EXCEPTION) {
throw CqQueryException("CqQuery::executeCq: exception at the server side: ",
reply.getException());
}
else {
GfErrTypeToException("CqQuery::executeCq", err);
}
}
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
m_cqState = CqState::RUNNING;
updateStats();
return true;
}
//for EXECUTE_INITIAL_RESULTS_REQUEST :
CqResultsPtr CqQueryImpl::executeWithInitialResults(uint32_t timeout)
{
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
ACE_Guard<ACE_Recursive_Thread_Mutex> guardRedundancy( *( m_tccdm->getRedundancyLock( ) ) );
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
if(m_cqState == CqState::RUNNING)
{
throw IllegalStateException("CqQuery::executeWithInitialResults: cq is already running");
}
//QueryResult values;
TcrMessage msg(TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE, m_cqName,
m_queryString, CqState::RUNNING, isDurable(), m_tccdm);
TcrMessage reply(true, m_tccdm);
ChunkedQueryResponse* resultCollector = (new ChunkedQueryResponse(reply));
reply.setChunkedResultHandler(static_cast<TcrChunkedResult *>(resultCollector));
reply.setTimeout(timeout);
GfErrType err = GF_NOERR;
err = m_tccdm->sendSyncRequest(msg, reply);
if ( err != GF_NOERR ) {
LOGDEBUG("CqQueryImpl::executeCqWithInitialResults errorred!!!!");
GfErrTypeToException("CqQuery::executeCqWithInitialResults:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
err = ThinClientRegion::handleServerException("CqQuery::executeCqWithInitialResults",
reply.getException());
if (err == GF_CACHESERVER_EXCEPTION) {
throw CqQueryException("CqQuery::executeWithInitialResults: exception "
"at the server side: ", reply.getException());
}
else {
GfErrTypeToException("CqQuery::executeWithInitialResults", err);
}
}
m_cqState = CqState::RUNNING;
updateStats();
CqResultsPtr sr;
CacheableVectorPtr values = resultCollector->getQueryResults();
const std::vector<CacheableStringPtr>& fieldNameVec =
resultCollector->getStructFieldNames();
int32_t sizeOfFieldNamesVec = static_cast<int32_t> (fieldNameVec.size());
if (sizeOfFieldNamesVec == 0) {
LOGFINEST("Query::execute: creating ResultSet for query: %s",
m_queryString.c_str());
sr = new ResultSetImpl(values);
}
else {
if (values->size() % fieldNameVec.size() != 0) {
throw MessageException("Query::execute: Number of values coming "
"from server has to be exactly divisible by field count");
}
else {
LOGFINEST("Query::execute: creating StructSet for query: %s",
m_queryString.c_str());
sr = new StructSetImpl(values, fieldNameVec);
}
}
delete resultCollector;
return sr;
}
/**
* Stop or pause executing the query.
*/
void CqQueryImpl::stop() {
if (isClosed()) {
throw CqClosedException(("CQ is closed, CqName : " + m_cqName).c_str());
}
GuardUserAttribures gua;
if(m_proxyCache != NULLPTR)
{
gua.setProxyCache(m_proxyCache);
}
if (!(isRunning())) {
throw IllegalStateException(("CQ is not in running state, stop CQ does not apply, CqName : " + m_cqName).c_str());
}
sendStopOrClose(TcrMessage::STOPCQ_MSG_TYPE);
/*
CqServiceVsdStats & stats = m_cqService->getCqServiceVsdStats();
stats.decNumCqsActive();
*/
setCqState(CqState::STOPPED);
//stats.incNumCqsStopped();
updateStats();
}
void CqQueryImpl::sendStopOrClose(TcrMessage::MsgType requestType)
{
TcrMessage msg(requestType, m_cqName, -1, m_tccdm);
TcrMessage reply(true, m_tccdm);
GfErrType err = GF_NOERR;
err = m_tccdm->sendSyncRequest(msg, reply);
if ( err != GF_NOERR ) {
GfErrTypeToException("CqQuery::stop/close:", err);
}
if (reply.getMessageType() == TcrMessage::EXCEPTION ||
reply.getMessageType() == TcrMessage::CQDATAERROR_MSG_TYPE ||
reply.getMessageType() == TcrMessage::CQ_EXCEPTION_TYPE) {
err = ThinClientRegion::handleServerException("CqQuery::stop/close",
reply.getException());
if (err == GF_CACHESERVER_EXCEPTION) {
throw CqQueryException("CqQuery::stop/close: exception at the server side: ",
reply.getException());
}
else {
GfErrTypeToException("CqQuery::stop/close", err);
}
}
}
/**
* Return the state of this query.
* @return STOPPED RUNNING or CLOSED
*/
CqState::StateType CqQueryImpl::getState() {
return m_cqState;
}
/**
* Sets the state of the cq.
* Server side method. Called during cq registration time.
*/
void CqQueryImpl::setCqState(CqState::StateType state) {
if (isClosed()) {
throw CqClosedException(("CQ is closed, CqName : " + m_cqName).c_str());
}
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
m_cqState = state;
}
const CqAttributesMutatorPtr CqQueryImpl::getCqAttributesMutator() const {
return m_cqAttributesMutator ;
}
/**
* @return Returns the cqOperation.
*/
CqOperation::CqOperationType CqQueryImpl::getCqOperation() {
return m_cqOperation;
}
/**
* @param cqOperation The cqOperation to set.
*/
void CqQueryImpl::setCqOperation(CqOperation::CqOperationType cqOperation) {
m_cqOperation = cqOperation;
}
/**
* Update CQ stats
* @param cqEvent object
*/
void CqQueryImpl::updateStats(CqEvent& cqEvent) {
CqQueryVsdStats* stats = dynamic_cast<CqQueryVsdStats*>(m_stats.ptr());
stats->incNumEvents();
switch (cqEvent.getQueryOperation())
{
case CqOperation::OP_TYPE_CREATE:
stats->incNumInserts();
break;
case CqOperation::OP_TYPE_UPDATE:
stats->incNumUpdates();
break;
case CqOperation::OP_TYPE_DESTROY:
stats->incNumDeletes();
break;
default:
break;
}
}
/**
* Return true if the CQ is in running state
* @return true if running, false otherwise
*/
bool CqQueryImpl::isRunning() {
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
return m_cqState==CqState::RUNNING;
}
/**
* Return true if the CQ is in Sstopped state
* @return true if stopped, false otherwise
*/
bool CqQueryImpl::isStopped() {
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
return m_cqState==CqState::STOPPED || (m_proxyCache != NULLPTR && m_proxyCache->isClosed());
}
/**
* Return true if the CQ is closed
* @return true if closed, false otherwise
*/
bool CqQueryImpl::isClosed() {
ACE_Guard<ACE_Recursive_Thread_Mutex> _guard( m_mutex );
return m_cqState==CqState::CLOSED || (m_proxyCache != NULLPTR && m_proxyCache->isClosed());
}
/**
* Return true if the CQ is durable
* @return true if durable, false otherwise
*/
bool CqQueryImpl::isDurable() {
return m_isDurable;
}