blob: 75f24fe8f844b387e566109b6ff48750b5729a5c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_CQQUERYIMPL_H_
#define GEODE_CQQUERYIMPL_H_
#include <mutex>
#include <string>
#include <geode/AuthenticatedView.hpp>
#include <geode/CqAttributes.hpp>
#include <geode/CqOperation.hpp>
#include <geode/CqQuery.hpp>
#include <geode/CqResults.hpp>
#include <geode/CqState.hpp>
#include <geode/Region.hpp>
#include <geode/internal/geode_globals.hpp>
#include "CqQueryVsdStats.hpp"
#include "CqService.hpp"
/**
* @file
*/
namespace apache {
namespace geode {
namespace client {
/**
* @class CqQueryImpl CqQueryImpl.hpp
*
* Represents the CqQuery object. Implements CqQuery API and CqAttributeMutator.
*
*/
class CqQueryImpl : public CqQuery,
public std::enable_shared_from_this<CqQueryImpl> {
protected:
std::string m_cqName;
std::string m_queryString;
std::shared_ptr<SelectResults> m_cqResults;
private:
std::shared_ptr<Query> m_query;
std::shared_ptr<CqAttributes> m_cqAttributes;
CqAttributesMutator m_cqAttributesMutator;
std::shared_ptr<CqService> m_cqService;
std::string m_serverCqName;
bool m_isDurable;
// Stats counters
std::shared_ptr<CqStatistics> m_stats;
CqState m_cqState;
CqOperation m_cqOperation;
/* CQ Request Type - Start */
// unused
/*
enum {
EXECUTE_REQUEST = 0,
EXECUTE_INITIAL_RESULTS_REQUEST = 1,
STOP_REQUEST = 2,
CLOSE_REQUEST = 3,
REDUNDANT_EXECUTE_REQUEST = 4
} CqRequestType;
*/
/* CQ Request type - End */
/**
* Constructor.
*/
public:
CqQueryImpl(
const std::shared_ptr<CqService>& cqService, const std::string& cqName,
const std::string& queryString,
const std::shared_ptr<CqAttributes>& cqAttributes,
statistics::StatisticsFactory* factory, const bool isDurable = false,
const std::shared_ptr<UserAttributes>& userAttributesPtr = nullptr);
virtual ~CqQueryImpl();
/**
* returns CQ name
*/
const std::string& getName() const override;
/**
* sets the CqName.
*/
void setName(std::string cqName);
/**
* Initializes the CqQuery.
* creates Query object, if its valid adds into repository.
*/
void initCq();
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* On Server side, takes care of repository cleanup.
* @throws CqException
*/
void close() override;
/**
* Closes the Query.
* On Client side, sends the cq close request to server.
* On Server side, takes care of repository cleanup.
* @param sendRequestToServer true to send the request to server.
* @throws CqException
*/
void close(bool sendRequestToServer);
/**
* Store this CQ in the cqService's cqMap.
* @throws CqException
*/
void addToCqMap();
/**
* Removes the CQ from CQ repository.
* @throws CqException
*/
void removeFromCqMap();
/**
* Returns the QueryString of this CQ.
*/
const std::string& getQueryString() const override;
/**
* Return the query after replacing region names with parameters
* @return the Query for the query string
*/
std::shared_ptr<Query> getQuery() const override;
/**
* @see org.apache.geode.cache.query.CqQuery#getStatistics()
*/
std::shared_ptr<CqStatistics> getStatistics() const override;
CqQueryVsdStats& getVsdStats() {
return *dynamic_cast<CqQueryVsdStats*>(m_stats.get());
}
std::shared_ptr<CqAttributes> getCqAttributes() const override;
std::shared_ptr<Region> getCqBaseRegion();
/**
* Clears the resource used by CQ.
* @throws CqException
*/
void cleanup();
/**
* Returns the cqListeners.
*/
void getCqListeners(std::vector<std::shared_ptr<CqListener>>& cqListener);
/**
* Start or resume executing the query.
*/
void execute() override;
void executeAfterFailover();
/**
* Execute CQ on endpoint after failover
*/
GfErrType execute(TcrEndpoint* endpoint);
/**
* Start or resume executing the query.
* Gets or updates the CQ results and returns them.
*/
std::shared_ptr<CqResults> executeWithInitialResults(
std::chrono::milliseconds timeout) override;
/**
* This is called when the new server comes-up.
* Executes the CQ on the given endpoint.
* @param requestType Tcr Message Type to request
*/
bool executeCq(TcrMessage::MsgType requestType);
/**
* Stop or pause executing the query.
*/
void stop() override;
/**
* Return the state of this query.
* @return STOPPED RUNNING or CLOSED
*/
CqState getState() override;
/**
* Sets the state of the cq.
* Server side method. Called during cq registration time.
*/
void setCqState(CqState state);
CqAttributesMutator getCqAttributesMutator() const override;
/**
* @return Returns the cqOperation.
*/
CqOperation getCqOperation() const;
/**
* @param cqOperation The cqOperation to set.
*/
void setCqOperation(CqOperation cqOperation);
/**
* Update CQ stats
* @param cqEvent object
*/
void updateStats(CqEvent& cqEvent);
/**
* Return true if the CQ is in running state
* @return true if running, false otherwise
*/
bool isRunning() const override;
/**
* Return true if the CQ is in Sstopped state
* @return true if stopped, false otherwise
*/
bool isStopped() const override;
/**
* Return true if the CQ is closed
* @return true if closed, false otherwise
*/
bool isClosed() const override;
/**
* Return true if the CQ is durable
* @return true if durable, false otherwise
*/
bool isDurable() const override;
inline ThinClientBaseDM* getDM() { return m_tccdm; }
private:
void updateStats();
mutable std::recursive_mutex m_mutex;
void sendStopOrClose(TcrMessage::MsgType requestType);
ThinClientBaseDM* m_tccdm;
AuthenticatedView* m_authenticatedView;
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_CQQUERYIMPL_H_