blob: 5fed8defc3046251aec2c752b6906415e7645827 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#ifndef GEODE_CQSERVICE_H_
#define GEODE_CQSERVICE_H_
#include <map>
#include <mutex>
#include <string>
#include <ace/Semaphore.h>
#include <geode/CacheableKey.hpp>
#include <geode/CqOperation.hpp>
#include <geode/CqQuery.hpp>
#include <geode/internal/geode_globals.hpp>
#include "CqServiceVsdStats.hpp"
#include "DistributedSystem.hpp"
#include "ErrType.hpp"
#include "NonCopyable.hpp"
#include "Queue.hpp"
#include "TcrMessage.hpp"
#include "util/synchronized_map.hpp"
namespace apache {
namespace geode {
namespace client {
class ThinClientBaseDM;
class TcrEndpoint;
/**
* @class CqService CqService.hpp
*
* Implements the CqService functionality.
*
*/
class APACHE_GEODE_EXPORT CqService
: private NonCopyable,
private NonAssignable,
public std::enable_shared_from_this<CqService> {
private:
ThinClientBaseDM* m_tccdm;
statistics::StatisticsFactory* m_statisticsFactory;
ACE_Semaphore m_notificationSema;
bool m_running;
synchronized_map<std::unordered_map<std::string, std::shared_ptr<CqQuery>>,
std::recursive_mutex>
m_cqQueryMap;
std::shared_ptr<CqServiceStatistics> m_stats;
inline bool noCq() const { return m_cqQueryMap.empty(); }
public:
typedef std::vector<std::shared_ptr<CqQuery>> query_container_type;
/**
* Constructor.
*/
CqService(ThinClientBaseDM* tccdm,
statistics::StatisticsFactory* statisticsFactory);
~CqService() noexcept;
ThinClientBaseDM* getDM() { return m_tccdm; }
void receiveNotification(TcrMessage* msg);
/**
* Returns the state of the cqService.
*/
bool checkAndAcquireLock();
void updateStats();
CqServiceVsdStats& getCqServiceVsdStats() {
return *dynamic_cast<CqServiceVsdStats*>(m_stats.get());
}
/**
* Constructs a new named continuous query, represented by an instance of
* CqQuery. The CqQuery is not executed, however, until the execute method
* is invoked on the CqQuery. The name of the query will be used
* to identify this query in statistics archival.
*
* @param cqName the String name for this query
* @param queryString the OQL query
* @param cqAttributes the CqAttributes
* @param isDurable true if the CQ is durable
* @return the newly created CqQuery object
* @throws CqExistsException if a CQ by this name already exists on this
* client
* @throws IllegalArgumentException if queryString or cqAttr is null
* @throws IllegalStateException if this method is called from a cache
* server
* @throws QueryInvalidException if there is a syntax error in the query
* @throws CqException if failed to create cq, failure during creating
* managing cq metadata info.
* @throws CqInvalidException if the query doesnot meet the CQ constraints.
* E.g.: Query string should refer only one region, join not supported.
* The query must be a SELECT statement.
* DISTINCT queries are not supported.
* Projections are not supported.
* Only one iterator in the FROM clause is supported, and it must be a
* region path.
* Bind parameters in the query are not supported for the initial
* release.
*
*/
std::shared_ptr<CqQuery> newCq(
const std::string& cqName, const std::string& queryString,
const std::shared_ptr<CqAttributes>& cqAttributes,
const bool isDurable = false);
/**
* Adds the given CQ and cqQuery object into the CQ map.
*/
void addCq(const std::string& cqName, std::shared_ptr<CqQuery>& cq);
/**
* Removes given CQ from the cqMap..
*/
void removeCq(const std::string& cqName);
/**
* Retrieve a CqQuery by name.
* @return the CqQuery or null if not found
*/
std::shared_ptr<CqQuery> getCq(const std::string& cqName);
/**
* Clears the CQ Query Map.
*/
void clearCqQueryMap();
/**
* Retrieve all registered CQs
*/
query_container_type getAllCqs();
/**
* Executes all the cqs on this client.
*/
void executeAllClientCqs(bool afterFailover = false);
/**
* Executes all CQs on the specified endpoint after failover.
*/
GfErrType executeAllClientCqs(TcrEndpoint* endpoint);
/**
* Executes all the given cqs.
*/
void executeCqs(query_container_type& cqs, bool afterFailover = false);
/**
* Executes all the given cqs on the specified endpoint after failover.
*/
GfErrType executeCqs(query_container_type& cqs, TcrEndpoint* endpoint);
/**
* Stops all the cqs
*/
void stopAllClientCqs();
/**
* Stops all the specified cqs.
*/
void stopCqs(query_container_type& cqs);
/**
* Close all CQs executing in this client, and release resources
* associated with executing CQs.
* CqQuerys created by other client are unaffected.
*/
void closeAllCqs();
/**
* Get statistics information for all CQs
* @return the CqServiceStatistics
*/
std::shared_ptr<CqServiceStatistics> getCqServiceStatistics();
/**
* Close the CQ Service after cleanup if any.
*
*/
void closeCqService();
/**
* Cleans up the CqService.
*/
void cleanup();
/*
* Checks if CQ with the given name already exists.
* @param cqName name of the CQ.
* @return true if exists else false.
*/
bool isCqExists(const std::string& cqName);
/**
* Invokes the CqListeners for the given CQs.
* @param cqs list of cqs with the cq operation from the Server.
* @param messageType base operation
* @param key
* @param value
*/
void invokeCqListeners(const std::map<std::string, int>* cqs,
uint32_t messageType,
std::shared_ptr<CacheableKey> key,
std::shared_ptr<Cacheable> value,
std::shared_ptr<CacheableBytes> deltaValue,
std::shared_ptr<EventId> eventId);
/**
* Returns the Operation for the given EnumListenerEvent type.
* @param eventType
* @return Operation
*/
CqOperation getOperation(int eventType);
void closeCqs(query_container_type& cqs);
/**
* Gets all the durable CQs registered by this client.
*
* @return List of names of registered durable CQs, empty list if no durable
* cqs.
*/
std::shared_ptr<CacheableArrayList> getAllDurableCqsFromServer();
void invokeCqConnectedListeners(const std::string& poolName,
const bool connected);
};
} // namespace client
} // namespace geode
} // namespace apache
#endif // GEODE_CQSERVICE_H_