blob: fa63e32a51acb8a133560d6fd5ac85944dec9d9b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.cache.query.cq.internal;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.logging.log4j.Logger;
import org.apache.geode.InvalidDeltaException;
import org.apache.geode.StatisticsFactory;
import org.apache.geode.SystemFailure;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.CacheLoaderException;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.RegionEvent;
import org.apache.geode.cache.TimeoutException;
import org.apache.geode.cache.client.Pool;
import org.apache.geode.cache.client.internal.GetEventValueOp;
import org.apache.geode.cache.client.internal.InternalPool;
import org.apache.geode.cache.client.internal.QueueManager;
import org.apache.geode.cache.client.internal.UserAttributes;
import org.apache.geode.cache.query.CqAttributes;
import org.apache.geode.cache.query.CqClosedException;
import org.apache.geode.cache.query.CqException;
import org.apache.geode.cache.query.CqExistsException;
import org.apache.geode.cache.query.CqListener;
import org.apache.geode.cache.query.CqQuery;
import org.apache.geode.cache.query.CqServiceStatistics;
import org.apache.geode.cache.query.CqStatusListener;
import org.apache.geode.cache.query.QueryException;
import org.apache.geode.cache.query.QueryInvalidException;
import org.apache.geode.cache.query.RegionNotFoundException;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.cq.internal.ops.ServerCQProxyImpl;
import org.apache.geode.cache.query.internal.CompiledSelect;
import org.apache.geode.cache.query.internal.CqQueryVsdStats;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.cq.ClientCQ;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.InternalCqQuery;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.EntryEventImpl;
import org.apache.geode.internal.cache.EventID;
import org.apache.geode.internal.cache.FilterProfile;
import org.apache.geode.internal.cache.FilterRoutingInfo;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.tier.MessageType;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.Part;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.util.JavaWorkarounds;
/**
* Implements the CqService functionality.
*
* @since GemFire 5.5
*/
public class CqServiceImpl implements CqService {
private static final Logger logger = LogService.getLogger();
private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE;
private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE;
private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY;
private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION;
/**
* System property to evaluate the query even though the initial results are not required when cq
* is executed using the execute() method.
*/
public static boolean EXECUTE_QUERY_DURING_INIT = Boolean.valueOf(System
.getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true"));
private static final String CQ_NAME_PREFIX = "GfCq";
private final InternalCache cache;
/**
* Manages cq pools to determine if a status of connect or disconnect needs to be sent out
*/
private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<>();
/**
* Manages CQ objects. uses serverCqName as key and CqQueryImpl as value
*
* GuardedBy cqQueryMapLock
*/
private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<>();
private final Object cqQueryMapLock = new Object();
private volatile boolean isRunning = false;
/**
* Used by client when multiuser-authentication is true.
*/
private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<>();
// Map to manage the similar CQs (having same query - performance optimization).
// With query as key and Set of CQs as values.
private final ConcurrentHashMap matchingCqMap;
// CQ Service statistics
private final CqServiceStatisticsImpl cqServiceStats;
private final CqServiceVsdStats stats;
// CQ identifier, also used in auto generated CQ names
private volatile long cqId = 1;
/* This is to manage region to CQs map, client side book keeping. */
private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<>();
/**
* Access and modification to the contents of this map do not necessarily need to be lock
* protected. This is just used to optimize construction of a server side cq name. Missing values
* in this cache will mean a look up for a specific proxy id and cq name will miss and reconstruct
* the string before adding it back to the cache
*/
private static final ConcurrentHashMap<String, ConcurrentHashMap<ClientProxyMembershipID, String>> serverCqNameCache =
new ConcurrentHashMap<>();
/**
* Constructor.
*
* @param cache The cache used for the service
*/
public CqServiceImpl(final InternalCache cache) {
if (cache == null) {
throw new IllegalStateException("cache is null");
}
cache.getCancelCriterion().checkCancelInProgress(null);
this.cache = cache;
// Initialize the Map which maintains the matching cqs.
this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>();
// Initialize the VSD statistics
StatisticsFactory factory = this.cache.getDistributedSystem();
this.stats = new CqServiceVsdStats(factory);
this.cqServiceStats = new CqServiceStatisticsImpl(this);
}
/**
* Returns the cache associated with the cqService.
*/
public Cache getCache() {
return this.cache;
}
public InternalCache getInternalCache() {
return this.cache;
}
public CqServiceVsdStats stats() {
return this.stats;
}
@Override
public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes,
InternalPool pool, boolean isDurable)
throws QueryInvalidException, CqExistsException, CqException {
if (queryString == null) {
throw new IllegalArgumentException(
String.format("Null argument %s", "queryString"));
} else if (cqAttributes == null) {
throw new IllegalArgumentException(
String.format("Null argument %s", "cqAttribute"));
}
if (isServer()) {
throw new IllegalStateException(
"client side newCq() method invocation on server.");
}
// Check if the given cq already exists.
if (cqName != null && isCqExists(cqName)) {
throw new CqExistsException(
String.format("CQ with the given name already exists. CqName : %s",
cqName));
}
ServerCQProxyImpl serverProxy = pool == null ? null : new ServerCQProxyImpl(pool);
ClientCQImpl cQuery =
new ClientCQImpl(this, cqName, queryString, cqAttributes, serverProxy, isDurable);
cQuery.updateCqCreateStats();
// cQuery.initCq();
// Check if query is valid.
cQuery.validateCq();
// Add cq into meta region.
// Check if Name needs to be generated.
if (cqName == null) {
// in the case of cqname internally generated, the CqExistsException needs
// to be taken care internally.
while (true) {
cQuery.setName(generateCqName());
try {
addToCqMap(cQuery);
} catch (CqExistsException ex) {
if (logger.isDebugEnabled()) {
logger.debug("Got CqExistsException while intializing cq : {} Error : {}",
cQuery.getName(), ex.getMessage());
}
continue;
}
break;
}
} else {
addToCqMap(cQuery);
}
this.addToBaseRegionToCqNameMap(cQuery.getBaseRegionName(), cQuery.getServerCqName());
return cQuery;
}
/**
* Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and
* executes. This is called on the Server.
*
* @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN
* @param regionDataPolicy the data policy of the region associated with the query. This is only
* needed if manageEmptyRegions is true.
* @param emptyRegionsMap map of empty regions.
* @throws IllegalStateException if this is called at client side.
*/
@Override
public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState,
ClientProxyMembershipID clientProxyId, CacheClientNotifier ccn, boolean isDurable,
boolean manageEmptyRegions, int regionDataPolicy, Map emptyRegionsMap)
throws CqException, RegionNotFoundException, CqClosedException {
if (!isServer()) {
throw new IllegalStateException(
String.format("Server side executeCq method is called on client. CqName : %s",
cqName));
}
String serverCqName = constructServerCqName(cqName, clientProxyId);
ServerCQImpl cQuery;
// If this CQ is not yet registered in Server, register CQ.
if (!isCqExists(serverCqName)) {
cQuery = new ServerCQImpl(this, cqName, queryString, isDurable,
constructServerCqName(cqName, clientProxyId));
try {
cQuery.registerCq(clientProxyId, ccn, cqState);
if (manageEmptyRegions) { // new in 6.1
if (emptyRegionsMap != null && emptyRegionsMap.containsKey(cQuery.getBaseRegionName())) {
regionDataPolicy = 0;
}
CacheClientProxy proxy = getCacheClientProxy(clientProxyId, ccn);
ccn.updateMapOfEmptyRegions(
proxy.getRegionsWithEmptyDataPolicy(),
cQuery.getBaseRegionName(), regionDataPolicy);
}
} catch (CqException cqe) {
logger.info("Exception while registering CQ on server. CqName : {}",
cQuery.getName());
throw cqe;
}
} else {
cQuery = (ServerCQImpl) getCq(serverCqName);
resumeCQ(cqState, cQuery);
}
if (logger.isDebugEnabled()) {
logger.debug("Successfully created CQ on the server. CqName : {}", cQuery.getName());
}
return cQuery;
}
public CacheClientProxy getCacheClientProxy(ClientProxyMembershipID clientProxyId,
CacheClientNotifier ccn) throws CqException {
CacheClientProxy proxy = ccn.getClientProxy(clientProxyId, true);
if (proxy == null) {
throw new CqException("No Cache Client Proxy found while executing CQ.");
}
return proxy;
}
@Override
public void resumeCQ(int cqState, ServerCQ cQuery) {
// Initialize the state of CQ.
if (((CqStateImpl) cQuery.getState()).getState() != cqState) {
cQuery.setCqState(cqState);
// addToCqEventKeysMap(cQuery);
// Send state change info to peers.
cQuery.getCqBaseRegion().getFilterProfile().setCqState(cQuery);
}
// If we are going to set the state to running, we need to check to see if it matches any other
// cq
if (cqState == CqStateImpl.RUNNING) {
// Add to the matchedCqMap.
addToMatchingCqMap((CqQueryImpl) cQuery);
}
}
/**
* Adds the given CQ and cqQuery object into the CQ map.
*/
void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException {
// On server side cqName will be server side cqName.
String sCqName = cq.getServerCqName();
if (logger.isDebugEnabled()) {
logger.debug("Adding to CQ Repository. CqName : {} ServerCqName : {}", cq.getName(), sCqName);
}
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
if (cqMap.containsKey(sCqName)) {
throw new CqExistsException(
String.format("A CQ with the given name %s already exists.",
sCqName));
}
synchronized (cqQueryMapLock) {
HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
try {
tmpCqQueryMap.put(sCqName, cq);
} catch (Exception ex) {
String errMsg =
"Failed to store Continuous Query in the repository. CqName: %s %s";
Object[] errMsgArgs = new Object[] {sCqName, ex.getLocalizedMessage()};
String s = String.format(errMsg, errMsgArgs);
logger.error(s);
throw new CqException(s, ex);
}
UserAttributes attributes = UserAttributes.userAttributes.get();
if (attributes != null) {
this.cqNameToUserAttributesMap.put(cq.getName(), attributes);
}
cqQueryMap = tmpCqQueryMap;
}
}
/**
* Removes given CQ from the cqMap..
*/
void removeCq(String cqName) {
// On server side cqName will be server side cqName.
synchronized (cqQueryMapLock) {
HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap);
tmpCqQueryMap.remove(cqName);
this.cqNameToUserAttributesMap.remove(cqName);
cqQueryMap = tmpCqQueryMap;
}
}
@Override
public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) {
// On server side cqName will be server side cqName.
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId));
}
@Override
public InternalCqQuery getCq(String cqName) {
// On server side cqName will be server side cqName.
return cqQueryMap.get(cqName);
}
@Override
public Collection<? extends InternalCqQuery> getAllCqs() {
return cqQueryMap.values();
}
@Override
public Collection<? extends InternalCqQuery> getAllCqs(final String regionName)
throws CqException {
if (regionName == null) {
throw new IllegalArgumentException(
String.format("Null argument %s", "regionName"));
}
String[] cqNames;
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
if (cqs == null) {
return null;
}
cqNames = new String[cqs.size()];
cqs.toArray(cqNames);
}
ArrayList<InternalCqQuery> cQueryList = new ArrayList<>();
for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) {
InternalCqQuery cq = getCq(cqNames[cqCnt]);
if (cq != null) {
cQueryList.add(cq);
}
}
return cQueryList;
}
@Override
public synchronized void executeAllClientCqs() throws CqException {
executeCqs(this.getAllCqs());
}
@Override
public synchronized void executeAllRegionCqs(final String regionName) throws CqException {
executeCqs(getAllCqs(regionName));
}
@Override
public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs)
throws CqException {
if (cqs == null) {
return;
}
String cqName = null;
for (InternalCqQuery internalCq : cqs) {
CqQuery cq = internalCq;
if (!cq.isClosed() && cq.isStopped()) {
try {
cqName = cq.getName();
cq.execute();
} catch (QueryException | CqClosedException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName,
e.getMessage());
}
}
}
}
}
@Override
public synchronized void stopAllClientCqs() throws CqException {
stopCqs(this.getAllCqs());
}
@Override
public synchronized void stopAllRegionCqs(final String regionName) throws CqException {
stopCqs(this.getAllCqs(regionName));
}
@Override
public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
if (cqs == null) {
logger.debug("CqService.stopCqs cqs : null");
} else {
logger.debug("CqService.stopCqs cqs : ({} queries)", cqs.size());
}
}
if (cqs == null) {
return;
}
String cqName = null;
for (InternalCqQuery internalCqQuery : cqs) {
CqQuery cq = internalCqQuery;
if (!cq.isClosed() && cq.isRunning()) {
try {
cqName = cq.getName();
cq.stop();
} catch (QueryException | CqClosedException e) {
if (isDebugEnabled) {
logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage());
}
}
}
}
}
@Override
public void closeCqs(final String regionName) throws CqException {
Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName);
if (cqs != null) {
String cqName = null;
for (InternalCqQuery cq : cqs) {
try {
cqName = cq.getName();
if (isServer()) {
// invoked on the server
cq.close(false);
} else {
// TODO: grid: if regionName has a pool check its keepAlive
boolean keepAlive = this.cache.keepDurableSubscriptionsAlive();
if (cq.isDurable() && keepAlive) {
logger.warn("Not sending CQ close to the server as it is a durable CQ");
cq.close(false);
} else {
cq.close(true);
}
}
} catch (QueryException | CqClosedException e) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage());
}
}
}
}
}
/**
* Called directly on server side.
*/
@Override
public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException {
String serverCqName = cqName;
if (clientId != null) {
serverCqName = this.constructServerCqName(cqName, clientId);
removeFromCacheForServerToConstructedCQName(cqName, clientId);
}
ServerCQImpl cQuery = null;
String errMsg = null;
Exception ex = null;
try {
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
if (!cqMap.containsKey(serverCqName)) {
/*
* gregp 052808: We should silently fail here instead of throwing error. This is to deal
* with races in recovery
*/
return;
}
cQuery = (ServerCQImpl) getCq(serverCqName);
} catch (CacheLoaderException e1) {
errMsg = "CQ not found in the cq meta region, CqName: %s";
ex = e1;
} catch (TimeoutException e2) {
errMsg = "Timeout while trying to get CQ from meta region, CqName: %s";
ex = e2;
} finally {
if (ex != null) {
String s = String.format(errMsg, cqName);
if (logger.isDebugEnabled()) {
logger.debug(s);
}
throw new CqException(s, ex);
}
}
try {
if (!cQuery.isStopped()) {
cQuery.stop();
}
} catch (CqClosedException cce) {
throw new CqException(cce.getMessage());
} finally {
// If this CQ is stopped, disable caching event keys for this CQ.
// this.removeCQFromCaching(cQuery.getServerCqName());
this.removeFromMatchingCqMap(cQuery);
}
// Send stop message to peers.
cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery);
}
@Override
public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException {
String serverCqName = cqName;
if (clientProxyId != null) {
serverCqName = this.constructServerCqName(cqName, clientProxyId);
removeFromCacheForServerToConstructedCQName(cqName, clientProxyId);
}
ServerCQImpl cQuery = null;
String errMsg = null;
Exception ex = null;
try {
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
if (!cqMap.containsKey(serverCqName)) {
/*
* gregp 052808: We should silently fail here instead of throwing error. This is to deal
* with races in recovery
*/
return;
}
cQuery = (ServerCQImpl) cqMap.get(serverCqName);
} catch (CacheLoaderException e1) {
errMsg = "CQ not found in the cq meta region, CqName: %s";
ex = e1;
} catch (TimeoutException e2) {
errMsg = "Timeout while trying to get CQ from meta region, CqName: %s";
ex = e2;
} finally {
if (ex != null) {
String s = String.format(errMsg, cqName);
if (logger.isDebugEnabled()) {
logger.debug(s);
}
throw new CqException(s, ex);
}
}
try {
cQuery.close(false);
// Repository Region.
// If CQ event caching is enabled, remove this CQs event cache reference.
// removeCQFromCaching(serverCqName);
// CqBaseRegion
try {
LocalRegion baseRegion = cQuery.getCqBaseRegion();
if (baseRegion != null && !baseRegion.isDestroyed()) {
// Server specific clean up.
if (isServer()) {
FilterProfile fp = baseRegion.getFilterProfile();
if (fp != null) {
fp.closeCq(cQuery);
}
CacheClientProxy clientProxy =
cQuery.getCacheClientNotifier().getClientProxy(clientProxyId);
clientProxy.decCqCount();
if (clientProxy.hasNoCq()) {
this.stats.decClientsWithCqs();
}
}
}
} catch (Exception e) {
// May be cache is being shutdown
if (logger.isDebugEnabled()) {
logger.debug("Failed to remove CQ from the base region. CqName : {}", cqName);
}
}
if (isServer()) {
removeFromBaseRegionToCqNameMap(cQuery.getRegionName(), serverCqName);
}
LocalRegion baseRegion = cQuery.getCqBaseRegion();
if (baseRegion.getFilterProfile().getCqCount() <= 0) {
if (logger.isDebugEnabled()) {
logger.debug(
"Should update the profile for this partitioned region {} for not requiring old value",
baseRegion);
}
}
} catch (CqClosedException cce) {
throw new CqException(cce.getMessage());
} finally {
this.removeFromMatchingCqMap(cQuery);
}
}
@Override
public void closeAllCqs(boolean clientInitiated) {
closeAllCqs(clientInitiated, getAllCqs());
}
/**
* Close all CQs executing in this VM, and release resources associated with executing CQs.
* CqQuerys created by other VMs are unaffected.
*/
private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) {
closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive());
}
@Override
public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs,
boolean keepAlive) {
if (cqs != null) {
String cqName = null;
if (logger.isDebugEnabled()) {
logger.debug("Closing all CQs, number of CQ to be closed : {}", cqs.size());
}
for (InternalCqQuery cQuery : cqs) {
try {
cqName = cQuery.getName();
if (isServer()) {
cQuery.close(false);
} else {
if (clientInitiated) {
cQuery.close(true);
} else {
if (!isServer() && cQuery.isDurable() && keepAlive) {
logger.warn("Not sending CQ close to the server as it is a durable CQ");
cQuery.close(false);
} else {
cQuery.close(true);
}
}
}
} catch (QueryException | CqClosedException e) {
if (!isRunning()) {
// Not cache shutdown
logger
.warn("Failed to close CQ %s %s",
cqName, e.getMessage());
}
if (logger.isDebugEnabled()) {
logger.debug(e.getMessage(), e);
}
}
}
}
}
@Override
public CqServiceStatistics getCqStatistics() {
return cqServiceStats;
}
@Override
public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Closing Client CQs for the client: {}", clientProxyId);
}
List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
for (ServerCQ cq : cqs) {
CqQueryImpl cQuery = (CqQueryImpl) cq;
try {
cQuery.close(false);
} catch (QueryException | CqClosedException e) {
if (isDebugEnabled) {
logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
e.getMessage());
}
}
}
}
@Override
public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId) {
Collection<? extends InternalCqQuery> cqs = getAllCqs();
ArrayList<ServerCQ> clientCqs = new ArrayList<>();
for (InternalCqQuery cq : cqs) {
ServerCQImpl cQuery = (ServerCQImpl) cq;
ClientProxyMembershipID id = cQuery.getClientProxyId();
if (id != null && id.equals(clientProxyId)) {
clientCqs.add(cQuery);
}
}
return clientCqs;
}
@Override
public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId)
throws CqException {
if (clientProxyId == null) {
throw new CqException(
String.format("Unable to retrieve durable CQs for client proxy id %s", clientProxyId));
}
List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
ArrayList<String> durableClientCqs = new ArrayList<>();
for (ServerCQ cq : cqs) {
ServerCQImpl cQuery = (ServerCQImpl) cq;
if (cQuery != null && cQuery.isDurable()) {
ClientProxyMembershipID id = cQuery.getClientProxyId();
if (id != null && id.equals(clientProxyId)) {
durableClientCqs.add(cQuery.getName());
}
}
}
return durableClientCqs;
}
/**
* Server side method. Closes non-durable CQs for the given client proxy id.
*/
@Override
public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Closing Client CQs for the client: {}", clientProxyId);
}
List<ServerCQ> cqs = getAllClientCqs(clientProxyId);
for (ServerCQ cq : cqs) {
ServerCQImpl cQuery = (ServerCQImpl) cq;
try {
if (!cQuery.isDurable()) {
cQuery.close(false);
}
} catch (QueryException | CqClosedException e) {
if (isDebugEnabled) {
logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(),
e.getMessage());
}
}
}
}
/**
* Is the CQ service in a cache server environment
*
* @return true if cache server, false otherwise
*/
public boolean isServer() {
if (this.cache.getCacheServers().isEmpty()) {
return false;
}
return true;
}
/**
* Cleans up the CqService.
*/
@Override
public void close() {
if (logger.isDebugEnabled()) {
logger.debug("Closing CqService. {}", this);
}
// Close All the CQs.
// Need to take care when Clients are still connected...
closeAllCqs(false);
isRunning = false;
}
@Override
public boolean isRunning() {
return this.isRunning;
}
@Override
public void start() {
this.isRunning = true;
}
/**
* @return Returns the serverCqName.
*/
@Override
public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) {
ConcurrentHashMap<ClientProxyMembershipID, String> cache =
JavaWorkarounds.computeIfAbsent(serverCqNameCache, cqName,
key -> new ConcurrentHashMap<>());
String cName = cache.get(clientProxyId);
if (null == cName) {
final StringBuilder sb = new StringBuilder(cqName).append("__");
if (clientProxyId.isDurable()) {
sb.append(clientProxyId.getDurableId());
} else {
sb.append(clientProxyId.getDSMembership());
}
cName = sb.toString();
cache.put(clientProxyId, cName);
}
return cName;
}
private void removeFromCacheForServerToConstructedCQName(final String cqName,
ClientProxyMembershipID clientProxyMembershipID) {
ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache.get(cqName);
if (cache != null) {
cache.remove(clientProxyMembershipID);
if (cache.size() == 0) {
serverCqNameCache.remove(cqName);
}
}
}
/**
* Checks if CQ with the given name already exists.
*
* @param cqName name of the CQ.
*
* @return true if exists else false.
*/
private synchronized boolean isCqExists(String cqName) {
HashMap<String, CqQueryImpl> cqMap = cqQueryMap;
return cqMap.containsKey(cqName);
}
/**
* Generates a name for CQ. Checks if CQ with that name already exists if so generates a new
* cqName.
*/
private synchronized String generateCqName() {
while (true) {
String cqName = CQ_NAME_PREFIX + (cqId++);
if (!isCqExists(cqName)) {
return cqName;
}
}
}
@Override
public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key,
Object value, byte[] delta, QueueManager qManager, EventID eventId) {
Object[] fullValue = new Object[1];
Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator();
String cqName = null;
final boolean isDebugEnabled = logger.isDebugEnabled();
while (iter.hasNext()) {
try {
Map.Entry<String, Integer> entry = iter.next();
cqName = entry.getKey();
ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
if (isDebugEnabled) {
logger.debug("Unable to invoke CqListener, {}, CqName : {}",
((cQuery == null) ? "CQ not found" : " CQ is Not running"), cqName);
}
continue;
}
Integer cqOp = entry.getValue();
// If Region destroy event, close the cq.
if (cqOp.intValue() == MessageType.DESTROY_REGION) {
// The close will also invoke the listeners close().
try {
cQuery.close(false);
} catch (Exception ex) {
// handle?
}
continue;
}
// Construct CqEvent.
CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp),
key, value, delta, qManager, eventId);
// Update statistics
cQuery.updateStats(cqEvent);
// Check if CQ Event needs to be queued.
if (cQuery.getQueuedEvents() != null) {
synchronized (cQuery.queuedEventsSynchObject) {
// Get latest value.
ConcurrentLinkedQueue<CqEventImpl> queuedEvents = cQuery.getQueuedEvents();
// Check to see, if its not set to null while waiting to get
// Synchronization lock.
if (queuedEvents != null) {
if (isDebugEnabled) {
logger.debug("Queueing event for key: {}", key);
}
cQuery.getVsdStats().incQueuedCqListenerEvents();
queuedEvents.add(cqEvent);
continue;
}
}
}
this.invokeListeners(cqName, cQuery, cqEvent, fullValue);
if (value == null) {
value = fullValue[0];
}
} // outer try
catch (Throwable t) {
logger.warn(String.format("Error processing CqListener for cq: %s", cqName), t);
if (t instanceof VirtualMachineError) {
logger.warn(String.format("VirtualMachineError processing CqListener for cq: %s",
cqName),
t);
return;
}
}
} // iteration.
}
void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) {
invokeListeners(cqName, cQuery, cqEvent, null);
}
private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent,
Object[] fullValue) {
if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
return;
}
// invoke CQ Listeners.
CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners();
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("Invoking CQ listeners for {}, number of listeners : {} cqEvent : {}", cqName,
cqListeners.length, cqEvent);
}
for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) {
try {
// Check if the listener is not null, it could have been changed/reset
// by the CqAttributeMutator.
if (cqListeners[lCnt] != null) {
cQuery.getVsdStats().incNumCqListenerInvocations();
try {
if (cqEvent.getThrowable() != null) {
cqListeners[lCnt].onError(cqEvent);
} else {
cqListeners[lCnt].onEvent(cqEvent);
}
} catch (InvalidDeltaException ide) {
if (isDebugEnabled) {
logger.debug("CqService.dispatchCqListeners(): Requesting full value...");
}
Part result = (Part) GetEventValueOp
.executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null);
Object newVal = result.getObject();
if (result == null || newVal == null) {
if (!cache.getCancelCriterion().isCancelInProgress()) {
Exception ex =
new Exception("Failed to retrieve full value from server for eventID "
+ cqEvent.getEventID());
logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}",
new Object[] {cqName, ex.getMessage()});
if (isDebugEnabled) {
logger.debug(ex.getMessage(), ex);
}
}
} else {
this.cache.getCachePerfStats().incDeltaFullValuesRequested();
cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(),
cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(),
cqEvent.getQueueManager(), cqEvent.getEventID());
if (cqEvent.getThrowable() != null) {
cqListeners[lCnt].onError(cqEvent);
} else {
cqListeners[lCnt].onEvent(cqEvent);
}
if (fullValue != null) {
fullValue[0] = newVal;
}
}
}
}
// Handle client side exceptions.
} catch (Exception ex) {
if (!cache.getCancelCriterion().isCancelInProgress()) {
logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}",
new Object[] {cqName, ex.getMessage()});
if (isDebugEnabled) {
logger.debug(ex.getMessage(), ex);
}
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn("Runtime Exception in the CqListener of the CQ, CqName: {} Error : {}",
new Object[] {cqName, t.getLocalizedMessage()});
if (isDebugEnabled) {
logger.debug(t.getMessage(), t);
}
}
}
}
private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) {
if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) {
return;
}
cQuery.setConnected(connected);
// invoke CQ Listeners.
CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners();
if (logger.isDebugEnabled()) {
logger.debug("Invoking CQ status listeners for {}, number of listeners : {}", cqName,
cqListeners.length);
}
for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) {
try {
if (cqListeners[lCnt] != null) {
if (cqListeners[lCnt] instanceof CqStatusListener) {
CqStatusListener listener = (CqStatusListener) cqListeners[lCnt];
if (connected) {
listener.onCqConnected();
} else {
listener.onCqDisconnected();
}
}
}
// Handle client side exceptions.
} catch (Exception ex) {
if (!cache.getCancelCriterion().isCancelInProgress()) {
logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}",
new Object[] {cqName, ex.getMessage()});
if (logger.isDebugEnabled()) {
logger.debug(ex.getMessage(), ex);
}
}
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
SystemFailure.checkFailure();
logger.warn("Runtime Exception in the CqListener of the CQ, CqName: {} Error : {}",
new Object[] {cqName, t.getLocalizedMessage()});
if (logger.isDebugEnabled()) {
logger.debug(t.getMessage(), t);
}
}
}
}
/**
* Returns the Operation for the given EnumListenerEvent type.
*/
private Operation getOperation(int eventType) {
Operation op = null;
switch (eventType) {
case MessageType.LOCAL_CREATE:
op = Operation.CREATE;
break;
case MessageType.LOCAL_UPDATE:
op = Operation.UPDATE;
break;
case MessageType.LOCAL_DESTROY:
op = Operation.DESTROY;
break;
case MessageType.LOCAL_INVALIDATE:
op = Operation.INVALIDATE;
break;
case MessageType.CLEAR_REGION:
op = Operation.REGION_CLEAR;
break;
case MessageType.INVALIDATE_REGION:
op = Operation.REGION_INVALIDATE;
break;
}
return op;
}
@Override
public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles,
FilterRoutingInfo frInfo) throws CqException {
// Is this a region event or an entry event
if (event instanceof RegionEvent) {
processRegionEvent(event, localProfile, profiles, frInfo);
} else {
// Use the PDX types in serialized form.
Boolean initialPdxReadSerialized = this.cache.getPdxReadSerializedOverride();
this.cache.setPdxReadSerializedOverride(true);
try {
processEntryEvent(event, localProfile, profiles, frInfo);
} finally {
this.cache.setPdxReadSerializedOverride(initialPdxReadSerialized);
}
}
}
private void processRegionEvent(CacheEvent event, Profile localProfile, Profile[] profiles,
FilterRoutingInfo frInfo) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("CQ service processing region event {}", event);
}
Integer cqRegionEvent = generateCqRegionEvent(event);
for (int i = -1; i < profiles.length; i++) {
CacheProfile cf;
if (i < 0) {
cf = (CacheProfile) localProfile;
if (cf == null)
continue;
} else {
cf = (CacheProfile) profiles[i];
}
FilterProfile pf = cf.filterProfile;
if (pf == null || pf.getCqMap().isEmpty()) {
continue;
}
Map cqs = pf.getCqMap();
HashMap<Long, Integer> cqInfo = new HashMap<>();
Iterator cqIter = cqs.entrySet().iterator();
while (cqIter.hasNext()) {
Map.Entry cqEntry = (Map.Entry) cqIter.next();
ServerCQImpl cQuery = (ServerCQImpl) cqEntry.getValue();
if (!event.isOriginRemote() && event.getOperation().isRegionDestroy()
&& !((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket()) {
try {
if (isDebugEnabled) {
logger.debug("Closing CQ on region destroy event. CqName : {}", cQuery.getName());
}
cQuery.close(false);
} catch (Exception ex) {
if (isDebugEnabled) {
logger.debug("Failed to Close CQ on region destroy. CqName : {}", cQuery.getName(),
ex);
}
}
}
cqInfo.put(cQuery.getFilterID(), cqRegionEvent);
cQuery.getVsdStats().updateStats(cqRegionEvent);
}
if (pf.isLocalProfile()) {
frInfo.setLocalCqInfo(cqInfo);
} else {
frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo);
}
}
}
private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles,
FilterRoutingInfo frInfo) throws CqException {
final boolean isDebugEnabled = logger.isDebugEnabled();
HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<>();
HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<>();
boolean b_cqResults_newValue;
boolean b_cqResults_oldValue;
boolean queryOldValue;
EntryEvent entryEvent = (EntryEvent) event;
Object eventKey = entryEvent.getKey();
boolean isDupEvent = ((EntryEventImpl) event).isPossibleDuplicate();
// The CQ query needs to be applied when the op is update, destroy
// invalidate and in case when op is create and its an duplicate
// event, the reason for this is when peer sends a duplicate event
// it marks it as create and sends it, so that the receiving node
// applies it (see DR.virtualPut()).
boolean opRequiringQueryOnOldValue = (event.getOperation().isUpdate()
|| event.getOperation().isDestroy() || event.getOperation().isInvalidate()
|| (event.getOperation().isCreate() && isDupEvent));
HashMap<String, Integer> matchedCqs = new HashMap<>();
long executionStartTime;
for (int i = -1; i < profiles.length; i++) {
CacheProfile cf;
if (i < 0) {
cf = (CacheProfile) localProfile;
if (cf == null)
continue;
} else {
cf = (CacheProfile) profiles[i];
}
FilterProfile pf = cf.filterProfile;
if (pf == null || pf.getCqMap().isEmpty()) {
continue;
}
Map cqs = pf.getCqMap();
if (isDebugEnabled) {
logger.debug("Profile for {} processing {} CQs", cf.peerMemberId, cqs.size());
}
if (cqs.isEmpty()) {
continue;
}
// Get new value. If its not retrieved.
if (cqUnfilteredEventsSet_newValue.isEmpty()
&& (event.getOperation().isCreate() || event.getOperation().isUpdate())) {
Object newValue = entryEvent.getNewValue();
if (newValue != null) {
// We have a new value to run the query on
cqUnfilteredEventsSet_newValue.add(newValue);
}
}
HashMap<Long, Integer> cqInfo = new HashMap<>();
Iterator cqIter = cqs.entrySet().iterator();
while (cqIter.hasNext()) {
Map.Entry cqEntry = (Map.Entry) cqIter.next();
ServerCQImpl cQuery = (ServerCQImpl) cqEntry.getValue();
b_cqResults_newValue = false;
b_cqResults_oldValue = false;
queryOldValue = false;
if (cQuery == null) {
continue;
}
String cqName = cQuery.getServerCqName();
Long filterID = cQuery.getFilterID();
if (isDebugEnabled) {
logger.debug("Processing CQ : {} Key: {}", cqName, eventKey);
}
Integer cqEvent = null;
if (matchedCqs.containsKey(cqName)) {
cqEvent = matchedCqs.get(cqName);
if (isDebugEnabled) {
logger.debug("query {} has already been processed and returned {}", cqName, cqEvent);
}
if (cqEvent == null) {
continue;
}
// Update the Cache Results for this CQ.
if (cqEvent.intValue() == MessageType.LOCAL_CREATE
|| cqEvent.intValue() == MessageType.LOCAL_UPDATE) {
cQuery.addToCqResultKeys(eventKey);
} else if (cqEvent.intValue() == MessageType.LOCAL_DESTROY) {
cQuery.markAsDestroyedInCqResultKeys(eventKey);
}
} else {
boolean error = false;
{
try {
synchronized (cQuery) {
// Apply query on new value.
if (!cqUnfilteredEventsSet_newValue.isEmpty()) {
executionStartTime = this.stats.startCqQueryExecution();
b_cqResults_newValue =
evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_newValue});
this.stats.endCqQueryExecution(executionStartTime);
}
}
// In case of Update, destroy and invalidate.
// Apply query on oldValue.
if (opRequiringQueryOnOldValue) {
// Check if CQ Result is cached, if not apply query on old
// value. Currently the CQ Results are not cached for the
// Partitioned Regions. Once this is added remove the check
// with PR region.
if (cQuery.cqResultKeysInitialized) {
b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey);
// For PR if not found in cache, apply the query on old value.
// Also apply if the query was not executed during cq execute
if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT)
&& b_cqResults_oldValue == false) {
queryOldValue = true;
}
if (isDebugEnabled && !cQuery.isPR && !b_cqResults_oldValue) {
logger.debug(
"Event Key not found in the CQ Result Queue. EventKey : {} CQ Name : {}",
eventKey, cqName);
}
} else {
queryOldValue = true;
}
if (queryOldValue) {
if (cqUnfilteredEventsSet_oldValue.isEmpty()) {
Object oldValue = entryEvent.getOldValue();
if (oldValue != null) {
cqUnfilteredEventsSet_oldValue.add(oldValue);
}
}
synchronized (cQuery) {
// Apply query on old value.
if (!cqUnfilteredEventsSet_oldValue.isEmpty()) {
executionStartTime = this.stats.startCqQueryExecution();
b_cqResults_oldValue =
evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue});
this.stats.endCqQueryExecution(executionStartTime);
} else {
if (isDebugEnabled) {
logger.debug(
"old value for event with key {} is null - query execution not performed",
eventKey);
}
}
}
} // Query oldValue
}
} catch (Exception ex) {
// Any exception in running the query should be caught here and
// buried because this code is running in-line with the message
// processing code and we don't want to kill that thread
error = true;
// CHANGE LOG MESSAGE:
logger.info("Error while processing CQ on the event, key : {} CqName: {}, Error: {}",
new Object[] {((EntryEvent) event).getKey(), cQuery.getName(),
ex.getLocalizedMessage()});
}
if (error) {
cqEvent = MESSAGE_TYPE_EXCEPTION;
} else {
if (b_cqResults_newValue) {
if (b_cqResults_oldValue) {
cqEvent = MESSAGE_TYPE_LOCAL_UPDATE;
} else {
cqEvent = MESSAGE_TYPE_LOCAL_CREATE;
}
// If its create and caching is enabled, cache the key
// for this CQ.
cQuery.addToCqResultKeys(eventKey);
} else if (b_cqResults_oldValue) {
// Base invalidate operation is treated as destroy.
// When the invalidate comes through, the entry will no longer
// satisfy the query and will need to be deleted.
cqEvent = MESSAGE_TYPE_LOCAL_DESTROY;
// If caching is enabled, mark this event's key as removed
// from the CQ cache.
cQuery.markAsDestroyedInCqResultKeys(eventKey);
}
}
}
// Get the matching CQs if any.
// synchronized (this.matchingCqMap){
String query = cQuery.getQueryString();
Set matchingCqs = (Set) matchingCqMap.get(query);
if (matchingCqs != null) {
Iterator iter = matchingCqs.iterator();
while (iter.hasNext()) {
String matchingCqName = (String) iter.next();
if (!matchingCqName.equals(cqName)) {
matchedCqs.put(matchingCqName, cqEvent);
if (isDebugEnabled) {
logger.debug("Adding CQ into Matching CQ Map: {} Event is: {}", matchingCqName,
cqEvent);
}
}
}
}
}
if (cqEvent != null && cQuery.isRunning()) {
if (isDebugEnabled) {
logger.debug("Added event to CQ with client-side name: {} key: {} operation : {}",
cQuery.cqName, eventKey, cqEvent);
}
cqInfo.put(filterID, cqEvent);
CqQueryVsdStats stats = cQuery.getVsdStats();
if (stats != null) {
stats.updateStats(cqEvent);
}
}
}
if (cqInfo.size() > 0) {
if (pf.isLocalProfile()) {
if (isDebugEnabled) {
logger.debug("Setting local CQ matches to {}", cqInfo);
}
frInfo.setLocalCqInfo(cqInfo);
} else {
if (isDebugEnabled) {
logger.debug("Setting CQ matches for {} to {}", cf.getDistributedMember(), cqInfo);
}
frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo);
}
}
} // iteration over Profiles.
}
private Integer generateCqRegionEvent(CacheEvent event) {
Integer cqEvent = null;
if (event.getOperation().isRegionDestroy()) {
cqEvent = MessageType.DESTROY_REGION;
} else if (event.getOperation().isRegionInvalidate()) {
cqEvent = MessageType.INVALIDATE_REGION;
} else if (event.getOperation().isClear()) {
cqEvent = MessageType.CLEAR_REGION;
}
return cqEvent;
}
/**
* Manages the CQs created for the base region. This is managed here, instead of on the base
* region; since the cq could be created on the base region, before base region is created (using
* newCq()).
*/
private void addToBaseRegionToCqNameMap(String regionName, String cqName) {
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
if (cqs == null) {
cqs = new ArrayList<>();
}
cqs.add(cqName);
this.baseRegionToCqNameMap.put(regionName, cqs);
}
}
void removeFromBaseRegionToCqNameMap(String regionName, String cqName) {
synchronized (this.baseRegionToCqNameMap) {
ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName);
if (cqs != null) {
cqs.remove(cqName);
if (cqs.isEmpty()) {
this.baseRegionToCqNameMap.remove(regionName);
} else {
this.baseRegionToCqNameMap.put(regionName, cqs);
}
}
}
}
/**
* Get the VSD ststs for CQ Service. There is one CQ Service per cache
*
* @return reference to VSD stats object for the CQ service
*/
public CqServiceVsdStats getCqServiceVsdStats() {
return stats;
}
/**
* Adds the query from the given CQ to the matched CQ map.
*/
void addToMatchingCqMap(CqQueryImpl cq) {
synchronized (this.matchingCqMap) {
String cqQuery = cq.getQueryString();
Set<String> matchingCQs;
if (!matchingCqMap.containsKey(cqQuery)) {
matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap());
matchingCqMap.put(cqQuery, matchingCQs);
this.stats.incUniqueCqQuery();
} else {
matchingCQs = (Set) matchingCqMap.get(cqQuery);
}
matchingCQs.add(cq.getServerCqName());
if (logger.isDebugEnabled()) {
logger.debug("Adding CQ into MatchingCQ map, CQName: {} Number of matched querys are: {}",
cq.getServerCqName(), matchingCQs.size());
}
}
}
/**
* Removes the query from the given CQ from the matched CQ map.
*/
private void removeFromMatchingCqMap(CqQueryImpl cq) {
synchronized (this.matchingCqMap) {
String cqQuery = cq.getQueryString();
if (matchingCqMap.containsKey(cqQuery)) {
Set matchingCQs = (Set) matchingCqMap.get(cqQuery);
matchingCQs.remove(cq.getServerCqName());
if (logger.isDebugEnabled()) {
logger.debug(
"Removing CQ from MatchingCQ map, CQName: {} Number of matched querys are: {}",
cq.getServerCqName(), matchingCQs.size());
}
if (matchingCQs.isEmpty()) {
matchingCqMap.remove(cqQuery);
this.stats.decUniqueCqQuery();
}
}
}
}
/**
* Returns the matching CQ map.
*
* @return HashMap matchingCqMap
*/
public Map<String, HashSet<String>> getMatchingCqMap() {
return matchingCqMap;
}
/**
* Applies the query on the event. This method takes care of the performance related changed done
* to improve the CQ-query performance. When CQ-query is executed first time, it saves the query
* related information in the execution context and uses that info in later executions.
*/
private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception {
ExecutionContext execContext = cQuery.getQueryExecutionContext();
execContext.reset();
execContext.setBindArguments(event);
boolean status = false;
// Check if the CQ query is executed once.
// If not execute the query in normal way.
// During this phase the query execution related info are stored in the
// ExecutionContext.
if (execContext.getScopeNum() <= 0) {
SelectResults results =
(SelectResults) ((DefaultQuery) cQuery.getQuery()).executeUsingContext(execContext);
if (results != null && results.size() > 0) {
status = true;
}
} else {
// Execute using the saved query info (in ExecutionContext).
// This avoids building resultSet, index look-up, generating build-plans
// that are not required for; query execution on single object.
CompiledSelect cs = ((DefaultQuery) (cQuery.getQuery())).getSelect();
status = cs.evaluateCq(execContext);
}
return status;
}
@Override
public UserAttributes getUserAttributes(String cqName) {
return this.cqNameToUserAttributesMap.get(cqName);
}
@Override
public void cqsDisconnected(Pool pool) {
invokeCqsConnected(pool, false);
}
@Override
public void cqsConnected(Pool pool) {
invokeCqsConnected(pool, true);
}
/**
* Let cq listeners know that they are connected or disconnected
*/
private void invokeCqsConnected(Pool pool, boolean connected) {
String poolName = pool.getName();
// Check to see if we are already connected/disconnected.
// If state has not changed, do not invoke another connected/disconnected
synchronized (cqPoolsConnected) {
// don't repeatedly send same connect/disconnect message to cq's on repeated fails of
// RedundancySatisfier
if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) {
return;
}
cqPoolsConnected.put(poolName, connected);
Collection<? extends InternalCqQuery> cqs = this.getAllCqs();
String cqName = null;
final boolean isDebugEnabled = logger.isDebugEnabled();
for (InternalCqQuery query : cqs) {
try {
if (query == null) {
continue;
}
cqName = query.getName();
ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName);
// Check cq pool to determine if the pool matches, if not continue.
// Also if the connected state is already the same, we do not have to send status again.
if (cQuery == null || cQuery.getCQProxy() == null) {
continue;
}
Pool cqPool = cQuery.getCQProxy().getPool();
if (cQuery.isConnected() == connected || !cqPool.getName().equals(poolName)) {
continue;
}
if ((!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) {
if (isDebugEnabled) {
logger.debug("Unable to invoke CqListener, CQ is Not running, CqName : {}", cqName);
}
continue;
}
this.invokeCqConnectedListeners(cqName, cQuery, connected);
} catch (VirtualMachineError e) {
SystemFailure.initiateFailure(e);
throw e;
} catch (Throwable t) {
SystemFailure.checkFailure();
logger.warn("Error while sending connection status to cq listeners", t);
}
}
}
}
@Override
public List<String> getAllDurableCqsFromServer(InternalPool pool) {
return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer();
}
}