blob: 123c7c729d1d939d2c0a67e0f8731c885ebb7246 [file] [log] [blame]
#pragma once
#ifndef GEODE_CQQUERYIMPL_H_
#define GEODE_CQQUERYIMPL_H_
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <gfcpp/gfcpp_globals.hpp>
#include <gfcpp/gf_types.hpp>
#include <gfcpp/CqResults.hpp>
#include <gfcpp/CqQuery.hpp>
#include <gfcpp/CqState.hpp>
#include "CqQueryVsdStats.hpp"
#include "CqService.hpp"
#include <gfcpp/CqOperation.hpp>
#include <gfcpp/CqAttributes.hpp>
#include <gfcpp/Region.hpp>
#include "MapWithLock.hpp"
#include <string>
#include <ace/ACE.h>
#include <ace/Condition_Recursive_Thread_Mutex.h>
#include <ace/Time_Value.h>
#include <ace/Guard_T.h>
#include <ace/Recursive_Thread_Mutex.h>
#include "ProxyCache.hpp"
/**
* @file
*/
namespace apache {
namespace geode {
namespace client {
/**
* @class CqQueryImpl CqQueryImpl.hpp
*
* Represents the CqQuery object. Implements CqQuery API and CqAttributeMutator.
*
*/
class CqQueryImpl : public CqQuery {
protected:
std::string m_cqName;
std::string m_queryString;
SelectResultsPtr m_cqResults;
private:
QueryPtr m_query;
CqAttributesPtr m_cqAttributes;
CqAttributesMutatorPtr m_cqAttributesMutator;
CqServicePtr m_cqService;
std::string m_serverCqName;
bool m_isDurable;
// Stats counters
CqStatisticsPtr m_stats;
CqState::StateType m_cqState;
CqOperation::CqOperationType m_cqOperation;
/* CQ Request Type - Start */
// unused
/*
enum {
EXECUTE_REQUEST = 0,
EXECUTE_INITIAL_RESULTS_REQUEST = 1,
STOP_REQUEST = 2,
CLOSE_REQUEST = 3,
REDUNDANT_EXECUTE_REQUEST = 4
} CqRequestType;
*/
/* CQ Request type - End */
/**
* Constructor.
*/
public:
CqQueryImpl(CqServicePtr& cqService, std::string& cqName,
std::string& queryString, CqAttributesPtr& cqAttributes,
bool isDurable = false,
UserAttributesPtr userAttributesPtr = NULLPTR);
~CqQueryImpl();
/**
* returns CQ name
*/
const char* getName() const;
/**
* sets the CqName.
*/
void setName(std::string& cqName);
/**
* Initializes the CqQuery.
* creates Query object, if its valid adds into repository.
*/
void initCq();
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* On Server side, takes care of repository cleanup.
* @throws CqException
*/
void close();
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* On Server side, takes care of repository cleanup.
* @param sendRequestToServer true to send the request to server.
* @throws CqException
*/
void close(bool sendRequestToServer);
/**
* Store this CQ in the cqService's cqMap.
* @throws CqException
*/
void addToCqMap();
/**
* Removes the CQ from CQ repository.
* @throws CqException
*/
void removeFromCqMap();
/**
* Returns the QueryString of this CQ.
*/
const char* getQueryString() const;
/**
* Return the query after replacing region names with parameters
* @return the Query for the query string
*/
QueryPtr getQuery() const;
/**
* @see org.apache.geode.cache.query.CqQuery#getStatistics()
*/
const CqStatisticsPtr getStatistics() const;
CqQueryVsdStats& getVsdStats() {
return *dynamic_cast<CqQueryVsdStats*>(m_stats.ptr());
}
const CqAttributesPtr getCqAttributes() const;
RegionPtr getCqBaseRegion();
/**
* Clears the resource used by CQ.
* @throws CqException
*/
void cleanup();
/**
* @return Returns the cqListeners.
*/
void getCqListeners(VectorOfCqListener& cqListener);
/**
* Start or resume executing the query.
*/
void execute();
void executeAfterFailover();
/**
* Execute CQ on endpoint after failover
*/
GfErrType execute(TcrEndpoint* endpoint);
/**
* Start or resume executing the query.
* Gets or updates the CQ results and returns them.
*/
CqResultsPtr executeWithInitialResults(uint32_t timeout);
/**
* This is called when the new server comes-up.
* Executes the CQ on the given endpoint.
* @param endpoint
*/
bool executeCq(TcrMessage::MsgType requestType);
/**
* Stop or pause executing the query.
*/
void stop();
/**
* Return the state of this query.
* @return STOPPED RUNNING or CLOSED
*/
CqState::StateType getState();
/**
* Sets the state of the cq.
* Server side method. Called during cq registration time.
*/
void setCqState(CqState::StateType state);
const CqAttributesMutatorPtr getCqAttributesMutator() const;
/**
* @return Returns the cqOperation.
*/
CqOperation::CqOperationType getCqOperation();
/**
* @param cqOperation The cqOperation to set.
*/
void setCqOperation(CqOperation::CqOperationType cqOperation);
/**
* Update CQ stats
* @param cqEvent object
*/
void updateStats(CqEvent& cqEvent);
/**
* Return true if the CQ is in running state
* @return true if running, false otherwise
*/
bool isRunning();
/**
* Return true if the CQ is in Sstopped state
* @return true if stopped, false otherwise
*/
bool isStopped();
/**
* Return true if the CQ is closed
* @return true if closed, false otherwise
*/
bool isClosed();
/**
* Return true if the CQ is durable
* @return true if durable, false otherwise
*/
bool isDurable();
inline ThinClientBaseDM* getDM() { return m_tccdm; }
private:
void updateStats();
ACE_Recursive_Thread_Mutex m_mutex;
void sendStopOrClose(TcrMessage::MsgType requestType);
ThinClientBaseDM* m_tccdm;
ProxyCachePtr m_proxyCache;
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_CQQUERYIMPL_H_