| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.cq.internal; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.InvalidDeltaException; |
| import org.apache.geode.StatisticsFactory; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.cache.Cache; |
| import org.apache.geode.cache.CacheEvent; |
| import org.apache.geode.cache.CacheLoaderException; |
| import org.apache.geode.cache.EntryEvent; |
| import org.apache.geode.cache.Operation; |
| import org.apache.geode.cache.RegionEvent; |
| import org.apache.geode.cache.TimeoutException; |
| import org.apache.geode.cache.client.Pool; |
| import org.apache.geode.cache.client.internal.GetEventValueOp; |
| import org.apache.geode.cache.client.internal.InternalPool; |
| import org.apache.geode.cache.client.internal.QueueManager; |
| import org.apache.geode.cache.client.internal.UserAttributes; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqClosedException; |
| import org.apache.geode.cache.query.CqException; |
| import org.apache.geode.cache.query.CqExistsException; |
| import org.apache.geode.cache.query.CqListener; |
| import org.apache.geode.cache.query.CqQuery; |
| import org.apache.geode.cache.query.CqServiceStatistics; |
| import org.apache.geode.cache.query.CqStatusListener; |
| import org.apache.geode.cache.query.QueryException; |
| import org.apache.geode.cache.query.QueryInvalidException; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.SelectResults; |
| import org.apache.geode.cache.query.cq.internal.ops.ServerCQProxyImpl; |
| import org.apache.geode.cache.query.internal.CompiledSelect; |
| import org.apache.geode.cache.query.internal.CqQueryVsdStats; |
| import org.apache.geode.cache.query.internal.CqStateImpl; |
| import org.apache.geode.cache.query.internal.DefaultQuery; |
| import org.apache.geode.cache.query.internal.ExecutionContext; |
| import org.apache.geode.cache.query.internal.cq.ClientCQ; |
| import org.apache.geode.cache.query.internal.cq.CqService; |
| import org.apache.geode.cache.query.internal.cq.InternalCqQuery; |
| import org.apache.geode.cache.query.internal.cq.ServerCQ; |
| import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; |
| import org.apache.geode.internal.cache.EntryEventImpl; |
| import org.apache.geode.internal.cache.EventID; |
| import org.apache.geode.internal.cache.FilterProfile; |
| import org.apache.geode.internal.cache.FilterRoutingInfo; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.tier.MessageType; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.internal.cache.tier.sockets.Part; |
| import org.apache.geode.internal.util.JavaWorkarounds; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * Implements the CqService functionality. |
| * |
| * @since GemFire 5.5 |
| */ |
| public class CqServiceImpl implements CqService { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final Integer MESSAGE_TYPE_LOCAL_CREATE = MessageType.LOCAL_CREATE; |
| private static final Integer MESSAGE_TYPE_LOCAL_UPDATE = MessageType.LOCAL_UPDATE; |
| private static final Integer MESSAGE_TYPE_LOCAL_DESTROY = MessageType.LOCAL_DESTROY; |
| private static final Integer MESSAGE_TYPE_EXCEPTION = MessageType.EXCEPTION; |
| |
| /** |
| * System property to evaluate the query even though the initial results are not required when cq |
| * is executed using the execute() method. |
| */ |
| public static boolean EXECUTE_QUERY_DURING_INIT = Boolean.valueOf(System |
| .getProperty(DistributionConfig.GEMFIRE_PREFIX + "cq.EXECUTE_QUERY_DURING_INIT", "true")); |
| |
| private static final String CQ_NAME_PREFIX = "GfCq"; |
| |
| private final InternalCache cache; |
| |
| /** |
| * Manages cq pools to determine if a status of connect or disconnect needs to be sent out |
| */ |
| private final HashMap<String, Boolean> cqPoolsConnected = new HashMap<>(); |
| |
| /** |
| * Manages CQ objects. uses serverCqName as key and CqQueryImpl as value |
| * |
| * GuardedBy cqQueryMapLock |
| */ |
| private volatile HashMap<String, CqQueryImpl> cqQueryMap = new HashMap<>(); |
| |
| private final Object cqQueryMapLock = new Object(); |
| |
| private volatile boolean isRunning = false; |
| |
| /** |
| * Used by client when multiuser-authentication is true. |
| */ |
| private final HashMap<String, UserAttributes> cqNameToUserAttributesMap = new HashMap<>(); |
| |
| // Map to manage the similar CQs (having same query - performance optimization). |
| // With query as key and Set of CQs as values. |
| private final ConcurrentHashMap matchingCqMap; |
| |
| // CQ Service statistics |
| private final CqServiceStatisticsImpl cqServiceStats; |
| private final CqServiceVsdStats stats; |
| |
| // CQ identifier, also used in auto generated CQ names |
| private volatile long cqId = 1; |
| |
| /* This is to manage region to CQs map, client side book keeping. */ |
| private HashMap<String, ArrayList<String>> baseRegionToCqNameMap = new HashMap<>(); |
| |
| /** |
| * Access and modification to the contents of this map do not necessarily need to be lock |
| * protected. This is just used to optimize construction of a server side cq name. Missing values |
| * in this cache will mean a look up for a specific proxy id and cq name will miss and reconstruct |
| * the string before adding it back to the cache |
| */ |
| private static final ConcurrentHashMap<String, ConcurrentHashMap<ClientProxyMembershipID, String>> serverCqNameCache = |
| new ConcurrentHashMap<>(); |
| |
| /** |
| * Constructor. |
| * |
| * @param cache The cache used for the service |
| */ |
| public CqServiceImpl(final InternalCache cache) { |
| if (cache == null) { |
| throw new IllegalStateException("cache is null"); |
| } |
| cache.getCancelCriterion().checkCancelInProgress(null); |
| |
| this.cache = cache; |
| |
| // Initialize the Map which maintains the matching cqs. |
| this.matchingCqMap = new ConcurrentHashMap<String, HashSet<String>>(); |
| |
| // Initialize the VSD statistics |
| StatisticsFactory factory = this.cache.getDistributedSystem(); |
| this.stats = new CqServiceVsdStats(factory); |
| this.cqServiceStats = new CqServiceStatisticsImpl(this); |
| } |
| |
| /** |
| * Returns the cache associated with the cqService. |
| */ |
| public Cache getCache() { |
| return this.cache; |
| } |
| |
| public InternalCache getInternalCache() { |
| return this.cache; |
| } |
| |
| public CqServiceVsdStats stats() { |
| return this.stats; |
| } |
| |
| @Override |
| public synchronized ClientCQ newCq(String cqName, String queryString, CqAttributes cqAttributes, |
| InternalPool pool, boolean isDurable) |
| throws QueryInvalidException, CqExistsException, CqException { |
| if (queryString == null) { |
| throw new IllegalArgumentException( |
| String.format("Null argument %s", "queryString")); |
| |
| } else if (cqAttributes == null) { |
| throw new IllegalArgumentException( |
| String.format("Null argument %s", "cqAttribute")); |
| } |
| |
| if (isServer()) { |
| throw new IllegalStateException( |
| "client side newCq() method invocation on server."); |
| } |
| |
| // Check if the given cq already exists. |
| if (cqName != null && isCqExists(cqName)) { |
| throw new CqExistsException( |
| String.format("CQ with the given name already exists. CqName : %s", |
| cqName)); |
| } |
| |
| |
| ServerCQProxyImpl serverProxy = pool == null ? null : new ServerCQProxyImpl(pool); |
| ClientCQImpl cQuery = |
| new ClientCQImpl(this, cqName, queryString, cqAttributes, serverProxy, isDurable); |
| cQuery.updateCqCreateStats(); |
| |
| // cQuery.initCq(); |
| |
| // Check if query is valid. |
| cQuery.validateCq(); |
| |
| // Add cq into meta region. |
| // Check if Name needs to be generated. |
| if (cqName == null) { |
| // in the case of cqname internally generated, the CqExistsException needs |
| // to be taken care internally. |
| while (true) { |
| cQuery.setName(generateCqName()); |
| try { |
| addToCqMap(cQuery); |
| } catch (CqExistsException ex) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Got CqExistsException while intializing cq : {} Error : {}", |
| cQuery.getName(), ex.getMessage()); |
| } |
| continue; |
| } |
| break; |
| } |
| } else { |
| addToCqMap(cQuery); |
| } |
| |
| this.addToBaseRegionToCqNameMap(cQuery.getBaseRegionName(), cQuery.getServerCqName()); |
| |
| return cQuery; |
| } |
| |
| /** |
| * Executes the given CqQuery, if the CqQuery for that name is not there it registers the one and |
| * executes. This is called on the Server. |
| * |
| * @param manageEmptyRegions whether to update the 6.1 emptyRegions map held in the CCN |
| * @param regionDataPolicy the data policy of the region associated with the query. This is only |
| * needed if manageEmptyRegions is true. |
| * @param emptyRegionsMap map of empty regions. |
| * @throws IllegalStateException if this is called at client side. |
| */ |
| @Override |
| public synchronized ServerCQ executeCq(String cqName, String queryString, int cqState, |
| ClientProxyMembershipID clientProxyId, CacheClientNotifier ccn, boolean isDurable, |
| boolean manageEmptyRegions, int regionDataPolicy, Map emptyRegionsMap) |
| throws CqException, RegionNotFoundException, CqClosedException { |
| if (!isServer()) { |
| throw new IllegalStateException( |
| String.format("Server side executeCq method is called on client. CqName : %s", |
| cqName)); |
| } |
| |
| String serverCqName = constructServerCqName(cqName, clientProxyId); |
| ServerCQImpl cQuery; |
| |
| // If this CQ is not yet registered in Server, register CQ. |
| if (!isCqExists(serverCqName)) { |
| cQuery = new ServerCQImpl(this, cqName, queryString, isDurable, |
| constructServerCqName(cqName, clientProxyId)); |
| |
| try { |
| cQuery.registerCq(clientProxyId, ccn, cqState); |
| if (manageEmptyRegions) { // new in 6.1 |
| if (emptyRegionsMap != null && emptyRegionsMap.containsKey(cQuery.getBaseRegionName())) { |
| regionDataPolicy = 0; |
| } |
| |
| CacheClientProxy proxy = getCacheClientProxy(clientProxyId, ccn); |
| ccn.updateMapOfEmptyRegions( |
| proxy.getRegionsWithEmptyDataPolicy(), |
| cQuery.getBaseRegionName(), regionDataPolicy); |
| } |
| } catch (CqException cqe) { |
| logger.info("Exception while registering CQ on server. CqName : {}", |
| cQuery.getName()); |
| throw cqe; |
| } |
| |
| } else { |
| cQuery = (ServerCQImpl) getCq(serverCqName); |
| resumeCQ(cqState, cQuery); |
| } |
| |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Successfully created CQ on the server. CqName : {}", cQuery.getName()); |
| } |
| return cQuery; |
| } |
| |
| public CacheClientProxy getCacheClientProxy(ClientProxyMembershipID clientProxyId, |
| CacheClientNotifier ccn) throws CqException { |
| CacheClientProxy proxy = ccn.getClientProxy(clientProxyId, true); |
| if (proxy == null) { |
| throw new CqException("No Cache Client Proxy found while executing CQ."); |
| } |
| return proxy; |
| } |
| |
| @Override |
| public void resumeCQ(int cqState, ServerCQ cQuery) { |
| // Initialize the state of CQ. |
| if (((CqStateImpl) cQuery.getState()).getState() != cqState) { |
| cQuery.setCqState(cqState); |
| // addToCqEventKeysMap(cQuery); |
| // Send state change info to peers. |
| cQuery.getCqBaseRegion().getFilterProfile().setCqState(cQuery); |
| } |
| // If we are going to set the state to running, we need to check to see if it matches any other |
| // cq |
| if (cqState == CqStateImpl.RUNNING) { |
| // Add to the matchedCqMap. |
| addToMatchingCqMap((CqQueryImpl) cQuery); |
| } |
| } |
| |
| /** |
| * Adds the given CQ and cqQuery object into the CQ map. |
| */ |
| void addToCqMap(CqQueryImpl cq) throws CqExistsException, CqException { |
| // On server side cqName will be server side cqName. |
| String sCqName = cq.getServerCqName(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding to CQ Repository. CqName : {} ServerCqName : {}", cq.getName(), sCqName); |
| } |
| HashMap<String, CqQueryImpl> cqMap = cqQueryMap; |
| if (cqMap.containsKey(sCqName)) { |
| throw new CqExistsException( |
| String.format("A CQ with the given name %s already exists.", |
| sCqName)); |
| } |
| synchronized (cqQueryMapLock) { |
| HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap); |
| try { |
| tmpCqQueryMap.put(sCqName, cq); |
| } catch (Exception ex) { |
| String errMsg = |
| "Failed to store Continuous Query in the repository. CqName: %s %s"; |
| Object[] errMsgArgs = new Object[] {sCqName, ex.getLocalizedMessage()}; |
| String s = String.format(errMsg, errMsgArgs); |
| logger.error(s); |
| throw new CqException(s, ex); |
| } |
| UserAttributes attributes = UserAttributes.userAttributes.get(); |
| if (attributes != null) { |
| this.cqNameToUserAttributesMap.put(cq.getName(), attributes); |
| } |
| cqQueryMap = tmpCqQueryMap; |
| } |
| } |
| |
| /** |
| * Removes given CQ from the cqMap.. |
| */ |
| void removeCq(String cqName) { |
| // On server side cqName will be server side cqName. |
| synchronized (cqQueryMapLock) { |
| HashMap<String, CqQueryImpl> tmpCqQueryMap = new HashMap<>(cqQueryMap); |
| tmpCqQueryMap.remove(cqName); |
| this.cqNameToUserAttributesMap.remove(cqName); |
| cqQueryMap = tmpCqQueryMap; |
| } |
| } |
| |
| @Override |
| public CqQuery getClientCqFromServer(ClientProxyMembershipID clientProxyId, String clientCqName) { |
| // On server side cqName will be server side cqName. |
| HashMap<String, CqQueryImpl> cqMap = cqQueryMap; |
| return cqMap.get(this.constructServerCqName(clientCqName, clientProxyId)); |
| } |
| |
| @Override |
| public InternalCqQuery getCq(String cqName) { |
| // On server side cqName will be server side cqName. |
| return cqQueryMap.get(cqName); |
| } |
| |
| @Override |
| public Collection<? extends InternalCqQuery> getAllCqs() { |
| return cqQueryMap.values(); |
| } |
| |
| @Override |
| public Collection<? extends InternalCqQuery> getAllCqs(final String regionName) |
| throws CqException { |
| if (regionName == null) { |
| throw new IllegalArgumentException( |
| String.format("Null argument %s", "regionName")); |
| } |
| |
| String[] cqNames; |
| |
| synchronized (this.baseRegionToCqNameMap) { |
| ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName); |
| if (cqs == null) { |
| return null; |
| } |
| cqNames = new String[cqs.size()]; |
| cqs.toArray(cqNames); |
| } |
| |
| ArrayList<InternalCqQuery> cQueryList = new ArrayList<>(); |
| for (int cqCnt = 0; cqCnt < cqNames.length; cqCnt++) { |
| InternalCqQuery cq = getCq(cqNames[cqCnt]); |
| if (cq != null) { |
| cQueryList.add(cq); |
| } |
| } |
| |
| return cQueryList; |
| } |
| |
| @Override |
| public synchronized void executeAllClientCqs() throws CqException { |
| executeCqs(this.getAllCqs()); |
| } |
| |
| @Override |
| public synchronized void executeAllRegionCqs(final String regionName) throws CqException { |
| executeCqs(getAllCqs(regionName)); |
| } |
| |
| @Override |
| public synchronized void executeCqs(Collection<? extends InternalCqQuery> cqs) |
| throws CqException { |
| if (cqs == null) { |
| return; |
| } |
| String cqName = null; |
| for (InternalCqQuery internalCq : cqs) { |
| CqQuery cq = internalCq; |
| if (!cq.isClosed() && cq.isStopped()) { |
| try { |
| cqName = cq.getName(); |
| cq.execute(); |
| } catch (QueryException | CqClosedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to execute the CQ, CqName : {} Error : {}", cqName, |
| e.getMessage()); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public synchronized void stopAllClientCqs() throws CqException { |
| stopCqs(this.getAllCqs()); |
| } |
| |
| @Override |
| public synchronized void stopAllRegionCqs(final String regionName) throws CqException { |
| stopCqs(this.getAllCqs(regionName)); |
| } |
| |
| @Override |
| public synchronized void stopCqs(Collection<? extends InternalCqQuery> cqs) throws CqException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| if (cqs == null) { |
| logger.debug("CqService.stopCqs cqs : null"); |
| } else { |
| logger.debug("CqService.stopCqs cqs : ({} queries)", cqs.size()); |
| } |
| } |
| |
| if (cqs == null) { |
| return; |
| } |
| |
| String cqName = null; |
| for (InternalCqQuery internalCqQuery : cqs) { |
| CqQuery cq = internalCqQuery; |
| if (!cq.isClosed() && cq.isRunning()) { |
| try { |
| cqName = cq.getName(); |
| cq.stop(); |
| } catch (QueryException | CqClosedException e) { |
| if (isDebugEnabled) { |
| logger.debug("Failed to stop the CQ, CqName : {} Error : {}", cqName, e.getMessage()); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void closeCqs(final String regionName) throws CqException { |
| Collection<? extends InternalCqQuery> cqs = this.getAllCqs(regionName); |
| if (cqs != null) { |
| String cqName = null; |
| for (InternalCqQuery cq : cqs) { |
| try { |
| cqName = cq.getName(); |
| |
| if (isServer()) { |
| // invoked on the server |
| cq.close(false); |
| } else { |
| // TODO: grid: if regionName has a pool check its keepAlive |
| boolean keepAlive = this.cache.keepDurableSubscriptionsAlive(); |
| if (cq.isDurable() && keepAlive) { |
| logger.warn("Not sending CQ close to the server as it is a durable CQ"); |
| cq.close(false); |
| } else { |
| cq.close(true); |
| } |
| } |
| |
| } catch (QueryException | CqClosedException e) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to close the CQ, CqName : {} Error : {}", cqName, e.getMessage()); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called directly on server side. |
| */ |
| @Override |
| public void stopCq(String cqName, ClientProxyMembershipID clientId) throws CqException { |
| String serverCqName = cqName; |
| if (clientId != null) { |
| serverCqName = this.constructServerCqName(cqName, clientId); |
| removeFromCacheForServerToConstructedCQName(cqName, clientId); |
| } |
| |
| ServerCQImpl cQuery = null; |
| String errMsg = null; |
| Exception ex = null; |
| |
| try { |
| HashMap<String, CqQueryImpl> cqMap = cqQueryMap; |
| if (!cqMap.containsKey(serverCqName)) { |
| /* |
| * gregp 052808: We should silently fail here instead of throwing error. This is to deal |
| * with races in recovery |
| */ |
| return; |
| } |
| cQuery = (ServerCQImpl) getCq(serverCqName); |
| |
| } catch (CacheLoaderException e1) { |
| errMsg = "CQ not found in the cq meta region, CqName: %s"; |
| ex = e1; |
| } catch (TimeoutException e2) { |
| errMsg = "Timeout while trying to get CQ from meta region, CqName: %s"; |
| ex = e2; |
| } finally { |
| if (ex != null) { |
| String s = String.format(errMsg, cqName); |
| if (logger.isDebugEnabled()) { |
| logger.debug(s); |
| } |
| throw new CqException(s, ex); |
| } |
| } |
| |
| try { |
| if (!cQuery.isStopped()) { |
| cQuery.stop(); |
| } |
| } catch (CqClosedException cce) { |
| throw new CqException(cce.getMessage()); |
| } finally { |
| // If this CQ is stopped, disable caching event keys for this CQ. |
| // this.removeCQFromCaching(cQuery.getServerCqName()); |
| this.removeFromMatchingCqMap(cQuery); |
| } |
| // Send stop message to peers. |
| cQuery.getCqBaseRegion().getFilterProfile().stopCq(cQuery); |
| } |
| |
| @Override |
| public void closeCq(String cqName, ClientProxyMembershipID clientProxyId) throws CqException { |
| String serverCqName = cqName; |
| if (clientProxyId != null) { |
| serverCqName = this.constructServerCqName(cqName, clientProxyId); |
| removeFromCacheForServerToConstructedCQName(cqName, clientProxyId); |
| } |
| |
| ServerCQImpl cQuery = null; |
| String errMsg = null; |
| Exception ex = null; |
| |
| try { |
| HashMap<String, CqQueryImpl> cqMap = cqQueryMap; |
| if (!cqMap.containsKey(serverCqName)) { |
| /* |
| * gregp 052808: We should silently fail here instead of throwing error. This is to deal |
| * with races in recovery |
| */ |
| return; |
| } |
| cQuery = (ServerCQImpl) cqMap.get(serverCqName); |
| |
| } catch (CacheLoaderException e1) { |
| errMsg = "CQ not found in the cq meta region, CqName: %s"; |
| ex = e1; |
| } catch (TimeoutException e2) { |
| errMsg = "Timeout while trying to get CQ from meta region, CqName: %s"; |
| ex = e2; |
| } finally { |
| if (ex != null) { |
| String s = String.format(errMsg, cqName); |
| if (logger.isDebugEnabled()) { |
| logger.debug(s); |
| } |
| throw new CqException(s, ex); |
| } |
| } |
| |
| try { |
| cQuery.close(false); |
| |
| // Repository Region. |
| // If CQ event caching is enabled, remove this CQs event cache reference. |
| // removeCQFromCaching(serverCqName); |
| |
| // CqBaseRegion |
| try { |
| LocalRegion baseRegion = cQuery.getCqBaseRegion(); |
| if (baseRegion != null && !baseRegion.isDestroyed()) { |
| // Server specific clean up. |
| if (isServer()) { |
| FilterProfile fp = baseRegion.getFilterProfile(); |
| if (fp != null) { |
| fp.closeCq(cQuery); |
| } |
| CacheClientProxy clientProxy = |
| cQuery.getCacheClientNotifier().getClientProxy(clientProxyId); |
| clientProxy.decCqCount(); |
| if (clientProxy.hasNoCq()) { |
| this.stats.decClientsWithCqs(); |
| } |
| } |
| } |
| } catch (Exception e) { |
| // May be cache is being shutdown |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to remove CQ from the base region. CqName : {}", cqName); |
| } |
| } |
| |
| if (isServer()) { |
| removeFromBaseRegionToCqNameMap(cQuery.getRegionName(), serverCqName); |
| } |
| |
| LocalRegion baseRegion = cQuery.getCqBaseRegion(); |
| if (baseRegion.getFilterProfile().getCqCount() <= 0) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Should update the profile for this partitioned region {} for not requiring old value", |
| baseRegion); |
| } |
| } |
| } catch (CqClosedException cce) { |
| throw new CqException(cce.getMessage()); |
| } finally { |
| this.removeFromMatchingCqMap(cQuery); |
| } |
| } |
| |
| @Override |
| public void closeAllCqs(boolean clientInitiated) { |
| closeAllCqs(clientInitiated, getAllCqs()); |
| } |
| |
| /** |
| * Close all CQs executing in this VM, and release resources associated with executing CQs. |
| * CqQuerys created by other VMs are unaffected. |
| */ |
| private void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs) { |
| closeAllCqs(clientInitiated, cqs, this.cache.keepDurableSubscriptionsAlive()); |
| } |
| |
| @Override |
| public void closeAllCqs(boolean clientInitiated, Collection<? extends InternalCqQuery> cqs, |
| boolean keepAlive) { |
| |
| if (cqs != null) { |
| String cqName = null; |
| if (logger.isDebugEnabled()) { |
| logger.debug("Closing all CQs, number of CQ to be closed : {}", cqs.size()); |
| } |
| for (InternalCqQuery cQuery : cqs) { |
| try { |
| cqName = cQuery.getName(); |
| |
| if (isServer()) { |
| cQuery.close(false); |
| } else { |
| if (clientInitiated) { |
| cQuery.close(true); |
| } else { |
| if (!isServer() && cQuery.isDurable() && keepAlive) { |
| logger.warn("Not sending CQ close to the server as it is a durable CQ"); |
| cQuery.close(false); |
| } else { |
| cQuery.close(true); |
| } |
| } |
| } |
| } catch (QueryException | CqClosedException e) { |
| if (!isRunning()) { |
| // Not cache shutdown |
| logger |
| .warn("Failed to close CQ %s %s", |
| cqName, e.getMessage()); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug(e.getMessage(), e); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public CqServiceStatistics getCqStatistics() { |
| return cqServiceStats; |
| } |
| |
| @Override |
| public void closeClientCqs(ClientProxyMembershipID clientProxyId) throws CqException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("Closing Client CQs for the client: {}", clientProxyId); |
| } |
| List<ServerCQ> cqs = getAllClientCqs(clientProxyId); |
| for (ServerCQ cq : cqs) { |
| CqQueryImpl cQuery = (CqQueryImpl) cq; |
| try { |
| cQuery.close(false); |
| } catch (QueryException | CqClosedException e) { |
| if (isDebugEnabled) { |
| logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), |
| e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public List<ServerCQ> getAllClientCqs(ClientProxyMembershipID clientProxyId) { |
| Collection<? extends InternalCqQuery> cqs = getAllCqs(); |
| ArrayList<ServerCQ> clientCqs = new ArrayList<>(); |
| |
| for (InternalCqQuery cq : cqs) { |
| ServerCQImpl cQuery = (ServerCQImpl) cq; |
| ClientProxyMembershipID id = cQuery.getClientProxyId(); |
| if (id != null && id.equals(clientProxyId)) { |
| clientCqs.add(cQuery); |
| } |
| } |
| return clientCqs; |
| } |
| |
| @Override |
| public List<String> getAllDurableClientCqs(ClientProxyMembershipID clientProxyId) |
| throws CqException { |
| if (clientProxyId == null) { |
| throw new CqException( |
| String.format("Unable to retrieve durable CQs for client proxy id %s", clientProxyId)); |
| } |
| List<ServerCQ> cqs = getAllClientCqs(clientProxyId); |
| ArrayList<String> durableClientCqs = new ArrayList<>(); |
| |
| for (ServerCQ cq : cqs) { |
| ServerCQImpl cQuery = (ServerCQImpl) cq; |
| if (cQuery != null && cQuery.isDurable()) { |
| ClientProxyMembershipID id = cQuery.getClientProxyId(); |
| if (id != null && id.equals(clientProxyId)) { |
| durableClientCqs.add(cQuery.getName()); |
| } |
| } |
| } |
| return durableClientCqs; |
| } |
| |
| /** |
| * Server side method. Closes non-durable CQs for the given client proxy id. |
| */ |
| @Override |
| public void closeNonDurableClientCqs(ClientProxyMembershipID clientProxyId) throws CqException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("Closing Client CQs for the client: {}", clientProxyId); |
| } |
| List<ServerCQ> cqs = getAllClientCqs(clientProxyId); |
| for (ServerCQ cq : cqs) { |
| ServerCQImpl cQuery = (ServerCQImpl) cq; |
| try { |
| if (!cQuery.isDurable()) { |
| cQuery.close(false); |
| } |
| } catch (QueryException | CqClosedException e) { |
| if (isDebugEnabled) { |
| logger.debug("Failed to close the CQ, CqName : {} Error : {}", cQuery.getName(), |
| e.getMessage()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Is the CQ service in a cache server environment |
| * |
| * @return true if cache server, false otherwise |
| */ |
| public boolean isServer() { |
| if (this.cache.getCacheServers().isEmpty()) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Cleans up the CqService. |
| */ |
| @Override |
| public void close() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Closing CqService. {}", this); |
| } |
| // Close All the CQs. |
| // Need to take care when Clients are still connected... |
| closeAllCqs(false); |
| isRunning = false; |
| } |
| |
| @Override |
| public boolean isRunning() { |
| return this.isRunning; |
| } |
| |
| @Override |
| public void start() { |
| this.isRunning = true; |
| } |
| |
| /** |
| * @return Returns the serverCqName. |
| */ |
| @Override |
| public String constructServerCqName(String cqName, ClientProxyMembershipID clientProxyId) { |
| ConcurrentHashMap<ClientProxyMembershipID, String> cache = |
| JavaWorkarounds.computeIfAbsent(serverCqNameCache, cqName, |
| key -> new ConcurrentHashMap<>()); |
| |
| String cName = cache.get(clientProxyId); |
| if (null == cName) { |
| final StringBuilder sb = new StringBuilder(cqName).append("__"); |
| if (clientProxyId.isDurable()) { |
| sb.append(clientProxyId.getDurableId()); |
| } else { |
| sb.append(clientProxyId.getDSMembership()); |
| } |
| cName = sb.toString(); |
| cache.put(clientProxyId, cName); |
| } |
| |
| return cName; |
| } |
| |
| private void removeFromCacheForServerToConstructedCQName(final String cqName, |
| ClientProxyMembershipID clientProxyMembershipID) { |
| ConcurrentHashMap<ClientProxyMembershipID, String> cache = serverCqNameCache.get(cqName); |
| if (cache != null) { |
| cache.remove(clientProxyMembershipID); |
| if (cache.size() == 0) { |
| serverCqNameCache.remove(cqName); |
| } |
| } |
| } |
| |
| /** |
| * Checks if CQ with the given name already exists. |
| * |
| * @param cqName name of the CQ. |
| * |
| * @return true if exists else false. |
| */ |
| private synchronized boolean isCqExists(String cqName) { |
| HashMap<String, CqQueryImpl> cqMap = cqQueryMap; |
| return cqMap.containsKey(cqName); |
| } |
| |
| /** |
| * Generates a name for CQ. Checks if CQ with that name already exists if so generates a new |
| * cqName. |
| */ |
| private synchronized String generateCqName() { |
| while (true) { |
| String cqName = CQ_NAME_PREFIX + (cqId++); |
| if (!isCqExists(cqName)) { |
| return cqName; |
| } |
| } |
| } |
| |
| @Override |
| public void dispatchCqListeners(HashMap<String, Integer> cqs, int messageType, Object key, |
| Object value, byte[] delta, QueueManager qManager, EventID eventId) { |
| Object[] fullValue = new Object[1]; |
| Iterator<Map.Entry<String, Integer>> iter = cqs.entrySet().iterator(); |
| String cqName = null; |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| while (iter.hasNext()) { |
| try { |
| Map.Entry<String, Integer> entry = iter.next(); |
| cqName = entry.getKey(); |
| ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName); |
| |
| if (cQuery == null || (!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) { |
| if (isDebugEnabled) { |
| logger.debug("Unable to invoke CqListener, {}, CqName : {}", |
| ((cQuery == null) ? "CQ not found" : " CQ is Not running"), cqName); |
| } |
| continue; |
| } |
| |
| Integer cqOp = entry.getValue(); |
| |
| // If Region destroy event, close the cq. |
| if (cqOp.intValue() == MessageType.DESTROY_REGION) { |
| // The close will also invoke the listeners close(). |
| try { |
| cQuery.close(false); |
| } catch (Exception ex) { |
| // handle? |
| } |
| continue; |
| } |
| |
| // Construct CqEvent. |
| CqEventImpl cqEvent = new CqEventImpl(cQuery, getOperation(messageType), getOperation(cqOp), |
| key, value, delta, qManager, eventId); |
| |
| // Update statistics |
| cQuery.updateStats(cqEvent); |
| |
| // Check if CQ Event needs to be queued. |
| if (cQuery.getQueuedEvents() != null) { |
| synchronized (cQuery.queuedEventsSynchObject) { |
| // Get latest value. |
| ConcurrentLinkedQueue<CqEventImpl> queuedEvents = cQuery.getQueuedEvents(); |
| // Check to see, if its not set to null while waiting to get |
| // Synchronization lock. |
| if (queuedEvents != null) { |
| if (isDebugEnabled) { |
| logger.debug("Queueing event for key: {}", key); |
| } |
| cQuery.getVsdStats().incQueuedCqListenerEvents(); |
| queuedEvents.add(cqEvent); |
| continue; |
| } |
| } |
| } |
| |
| this.invokeListeners(cqName, cQuery, cqEvent, fullValue); |
| if (value == null) { |
| value = fullValue[0]; |
| } |
| |
| } // outer try |
| catch (Throwable t) { |
| logger.warn(String.format("Error processing CqListener for cq: %s", cqName), t); |
| |
| if (t instanceof VirtualMachineError) { |
| logger.warn(String.format("VirtualMachineError processing CqListener for cq: %s", |
| cqName), |
| t); |
| return; |
| } |
| } |
| } // iteration. |
| } |
| |
| void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent) { |
| invokeListeners(cqName, cQuery, cqEvent, null); |
| } |
| |
| private void invokeListeners(String cqName, ClientCQImpl cQuery, CqEventImpl cqEvent, |
| Object[] fullValue) { |
| if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) { |
| return; |
| } |
| // invoke CQ Listeners. |
| CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners(); |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("Invoking CQ listeners for {}, number of listeners : {} cqEvent : {}", cqName, |
| cqListeners.length, cqEvent); |
| } |
| |
| for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) { |
| try { |
| // Check if the listener is not null, it could have been changed/reset |
| // by the CqAttributeMutator. |
| if (cqListeners[lCnt] != null) { |
| cQuery.getVsdStats().incNumCqListenerInvocations(); |
| try { |
| if (cqEvent.getThrowable() != null) { |
| cqListeners[lCnt].onError(cqEvent); |
| } else { |
| cqListeners[lCnt].onEvent(cqEvent); |
| } |
| } catch (InvalidDeltaException ide) { |
| if (isDebugEnabled) { |
| logger.debug("CqService.dispatchCqListeners(): Requesting full value..."); |
| } |
| Part result = (Part) GetEventValueOp |
| .executeOnPrimary(cqEvent.getQueueManager().getPool(), cqEvent.getEventID(), null); |
| Object newVal = result.getObject(); |
| if (result == null || newVal == null) { |
| if (!cache.getCancelCriterion().isCancelInProgress()) { |
| Exception ex = |
| new Exception("Failed to retrieve full value from server for eventID " |
| + cqEvent.getEventID()); |
| logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}", |
| new Object[] {cqName, ex.getMessage()}); |
| if (isDebugEnabled) { |
| logger.debug(ex.getMessage(), ex); |
| } |
| } |
| } else { |
| this.cache.getCachePerfStats().incDeltaFullValuesRequested(); |
| cqEvent = new CqEventImpl(cQuery, cqEvent.getBaseOperation(), |
| cqEvent.getQueryOperation(), cqEvent.getKey(), newVal, cqEvent.getDeltaValue(), |
| cqEvent.getQueueManager(), cqEvent.getEventID()); |
| if (cqEvent.getThrowable() != null) { |
| cqListeners[lCnt].onError(cqEvent); |
| } else { |
| cqListeners[lCnt].onEvent(cqEvent); |
| } |
| if (fullValue != null) { |
| fullValue[0] = newVal; |
| } |
| } |
| } |
| } |
| // Handle client side exceptions. |
| } catch (Exception ex) { |
| if (!cache.getCancelCriterion().isCancelInProgress()) { |
| logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}", |
| new Object[] {cqName, ex.getMessage()}); |
| if (isDebugEnabled) { |
| logger.debug(ex.getMessage(), ex); |
| } |
| } |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.warn("Runtime Exception in the CqListener of the CQ, CqName: {} Error : {}", |
| new Object[] {cqName, t.getLocalizedMessage()}); |
| if (isDebugEnabled) { |
| logger.debug(t.getMessage(), t); |
| } |
| } |
| } |
| } |
| |
| private void invokeCqConnectedListeners(String cqName, ClientCQImpl cQuery, boolean connected) { |
| if (!cQuery.isRunning() || cQuery.getCqAttributes() == null) { |
| return; |
| } |
| cQuery.setConnected(connected); |
| // invoke CQ Listeners. |
| CqListener[] cqListeners = cQuery.getCqAttributes().getCqListeners(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Invoking CQ status listeners for {}, number of listeners : {}", cqName, |
| cqListeners.length); |
| } |
| |
| for (int lCnt = 0; lCnt < cqListeners.length; lCnt++) { |
| try { |
| if (cqListeners[lCnt] != null) { |
| if (cqListeners[lCnt] instanceof CqStatusListener) { |
| CqStatusListener listener = (CqStatusListener) cqListeners[lCnt]; |
| if (connected) { |
| listener.onCqConnected(); |
| } else { |
| listener.onCqDisconnected(); |
| } |
| } |
| } |
| // Handle client side exceptions. |
| } catch (Exception ex) { |
| if (!cache.getCancelCriterion().isCancelInProgress()) { |
| logger.warn("Exception in the CqListener of the CQ, CqName: {} Error : {}", |
| new Object[] {cqName, ex.getMessage()}); |
| if (logger.isDebugEnabled()) { |
| logger.debug(ex.getMessage(), ex); |
| } |
| } |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable t) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| logger.warn("Runtime Exception in the CqListener of the CQ, CqName: {} Error : {}", |
| new Object[] {cqName, t.getLocalizedMessage()}); |
| if (logger.isDebugEnabled()) { |
| logger.debug(t.getMessage(), t); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the Operation for the given EnumListenerEvent type. |
| */ |
| private Operation getOperation(int eventType) { |
| Operation op = null; |
| switch (eventType) { |
| case MessageType.LOCAL_CREATE: |
| op = Operation.CREATE; |
| break; |
| |
| case MessageType.LOCAL_UPDATE: |
| op = Operation.UPDATE; |
| break; |
| |
| case MessageType.LOCAL_DESTROY: |
| op = Operation.DESTROY; |
| break; |
| |
| case MessageType.LOCAL_INVALIDATE: |
| op = Operation.INVALIDATE; |
| break; |
| |
| case MessageType.CLEAR_REGION: |
| op = Operation.REGION_CLEAR; |
| break; |
| |
| case MessageType.INVALIDATE_REGION: |
| op = Operation.REGION_INVALIDATE; |
| break; |
| } |
| return op; |
| } |
| |
| @Override |
| public void processEvents(CacheEvent event, Profile localProfile, Profile[] profiles, |
| FilterRoutingInfo frInfo) throws CqException { |
| // Is this a region event or an entry event |
| if (event instanceof RegionEvent) { |
| processRegionEvent(event, localProfile, profiles, frInfo); |
| } else { |
| // Use the PDX types in serialized form. |
| Boolean initialPdxReadSerialized = this.cache.getPdxReadSerializedOverride(); |
| this.cache.setPdxReadSerializedOverride(true); |
| try { |
| processEntryEvent(event, localProfile, profiles, frInfo); |
| } finally { |
| this.cache.setPdxReadSerializedOverride(initialPdxReadSerialized); |
| } |
| } |
| } |
| |
| private void processRegionEvent(CacheEvent event, Profile localProfile, Profile[] profiles, |
| FilterRoutingInfo frInfo) throws CqException { |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("CQ service processing region event {}", event); |
| } |
| Integer cqRegionEvent = generateCqRegionEvent(event); |
| |
| for (int i = -1; i < profiles.length; i++) { |
| CacheProfile cf; |
| if (i < 0) { |
| cf = (CacheProfile) localProfile; |
| if (cf == null) |
| continue; |
| } else { |
| cf = (CacheProfile) profiles[i]; |
| } |
| FilterProfile pf = cf.filterProfile; |
| if (pf == null || pf.getCqMap().isEmpty()) { |
| continue; |
| } |
| Map cqs = pf.getCqMap(); |
| HashMap<Long, Integer> cqInfo = new HashMap<>(); |
| Iterator cqIter = cqs.entrySet().iterator(); |
| while (cqIter.hasNext()) { |
| Map.Entry cqEntry = (Map.Entry) cqIter.next(); |
| ServerCQImpl cQuery = (ServerCQImpl) cqEntry.getValue(); |
| if (!event.isOriginRemote() && event.getOperation().isRegionDestroy() |
| && !((LocalRegion) event.getRegion()).isUsedForPartitionedRegionBucket()) { |
| try { |
| if (isDebugEnabled) { |
| logger.debug("Closing CQ on region destroy event. CqName : {}", cQuery.getName()); |
| } |
| cQuery.close(false); |
| } catch (Exception ex) { |
| if (isDebugEnabled) { |
| logger.debug("Failed to Close CQ on region destroy. CqName : {}", cQuery.getName(), |
| ex); |
| } |
| } |
| } |
| cqInfo.put(cQuery.getFilterID(), cqRegionEvent); |
| cQuery.getVsdStats().updateStats(cqRegionEvent); |
| } |
| if (pf.isLocalProfile()) { |
| frInfo.setLocalCqInfo(cqInfo); |
| } else { |
| frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo); |
| } |
| } |
| } |
| |
| private void processEntryEvent(CacheEvent event, Profile localProfile, Profile[] profiles, |
| FilterRoutingInfo frInfo) throws CqException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| HashSet<Object> cqUnfilteredEventsSet_newValue = new HashSet<>(); |
| HashSet<Object> cqUnfilteredEventsSet_oldValue = new HashSet<>(); |
| boolean b_cqResults_newValue; |
| boolean b_cqResults_oldValue; |
| boolean queryOldValue; |
| EntryEvent entryEvent = (EntryEvent) event; |
| Object eventKey = entryEvent.getKey(); |
| |
| boolean isDupEvent = ((EntryEventImpl) event).isPossibleDuplicate(); |
| // The CQ query needs to be applied when the op is update, destroy |
| // invalidate and in case when op is create and its an duplicate |
| // event, the reason for this is when peer sends a duplicate event |
| // it marks it as create and sends it, so that the receiving node |
| // applies it (see DR.virtualPut()). |
| boolean opRequiringQueryOnOldValue = (event.getOperation().isUpdate() |
| || event.getOperation().isDestroy() || event.getOperation().isInvalidate() |
| || (event.getOperation().isCreate() && isDupEvent)); |
| |
| HashMap<String, Integer> matchedCqs = new HashMap<>(); |
| long executionStartTime; |
| for (int i = -1; i < profiles.length; i++) { |
| CacheProfile cf; |
| if (i < 0) { |
| cf = (CacheProfile) localProfile; |
| if (cf == null) |
| continue; |
| } else { |
| cf = (CacheProfile) profiles[i]; |
| } |
| FilterProfile pf = cf.filterProfile; |
| if (pf == null || pf.getCqMap().isEmpty()) { |
| continue; |
| } |
| |
| Map cqs = pf.getCqMap(); |
| |
| if (isDebugEnabled) { |
| logger.debug("Profile for {} processing {} CQs", cf.peerMemberId, cqs.size()); |
| } |
| |
| if (cqs.isEmpty()) { |
| continue; |
| } |
| |
| // Get new value. If its not retrieved. |
| if (cqUnfilteredEventsSet_newValue.isEmpty() |
| && (event.getOperation().isCreate() || event.getOperation().isUpdate())) { |
| Object newValue = entryEvent.getNewValue(); |
| if (newValue != null) { |
| // We have a new value to run the query on |
| cqUnfilteredEventsSet_newValue.add(newValue); |
| } |
| } |
| |
| HashMap<Long, Integer> cqInfo = new HashMap<>(); |
| Iterator cqIter = cqs.entrySet().iterator(); |
| |
| while (cqIter.hasNext()) { |
| Map.Entry cqEntry = (Map.Entry) cqIter.next(); |
| ServerCQImpl cQuery = (ServerCQImpl) cqEntry.getValue(); |
| b_cqResults_newValue = false; |
| b_cqResults_oldValue = false; |
| queryOldValue = false; |
| if (cQuery == null) { |
| continue; |
| } |
| String cqName = cQuery.getServerCqName(); |
| Long filterID = cQuery.getFilterID(); |
| |
| if (isDebugEnabled) { |
| logger.debug("Processing CQ : {} Key: {}", cqName, eventKey); |
| } |
| |
| Integer cqEvent = null; |
| if (matchedCqs.containsKey(cqName)) { |
| cqEvent = matchedCqs.get(cqName); |
| if (isDebugEnabled) { |
| logger.debug("query {} has already been processed and returned {}", cqName, cqEvent); |
| } |
| if (cqEvent == null) { |
| continue; |
| } |
| // Update the Cache Results for this CQ. |
| if (cqEvent.intValue() == MessageType.LOCAL_CREATE |
| || cqEvent.intValue() == MessageType.LOCAL_UPDATE) { |
| cQuery.addToCqResultKeys(eventKey); |
| } else if (cqEvent.intValue() == MessageType.LOCAL_DESTROY) { |
| cQuery.markAsDestroyedInCqResultKeys(eventKey); |
| } |
| } else { |
| boolean error = false; |
| { |
| try { |
| synchronized (cQuery) { |
| // Apply query on new value. |
| if (!cqUnfilteredEventsSet_newValue.isEmpty()) { |
| executionStartTime = this.stats.startCqQueryExecution(); |
| |
| b_cqResults_newValue = |
| evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_newValue}); |
| this.stats.endCqQueryExecution(executionStartTime); |
| } |
| } |
| |
| // In case of Update, destroy and invalidate. |
| // Apply query on oldValue. |
| if (opRequiringQueryOnOldValue) { |
| // Check if CQ Result is cached, if not apply query on old |
| // value. Currently the CQ Results are not cached for the |
| // Partitioned Regions. Once this is added remove the check |
| // with PR region. |
| if (cQuery.cqResultKeysInitialized) { |
| b_cqResults_oldValue = cQuery.isPartOfCqResult(eventKey); |
| // For PR if not found in cache, apply the query on old value. |
| // Also apply if the query was not executed during cq execute |
| if ((cQuery.isPR || !CqServiceImpl.EXECUTE_QUERY_DURING_INIT) |
| && b_cqResults_oldValue == false) { |
| queryOldValue = true; |
| } |
| if (isDebugEnabled && !cQuery.isPR && !b_cqResults_oldValue) { |
| logger.debug( |
| "Event Key not found in the CQ Result Queue. EventKey : {} CQ Name : {}", |
| eventKey, cqName); |
| } |
| } else { |
| queryOldValue = true; |
| } |
| |
| if (queryOldValue) { |
| if (cqUnfilteredEventsSet_oldValue.isEmpty()) { |
| Object oldValue = entryEvent.getOldValue(); |
| if (oldValue != null) { |
| cqUnfilteredEventsSet_oldValue.add(oldValue); |
| } |
| } |
| |
| synchronized (cQuery) { |
| // Apply query on old value. |
| if (!cqUnfilteredEventsSet_oldValue.isEmpty()) { |
| executionStartTime = this.stats.startCqQueryExecution(); |
| b_cqResults_oldValue = |
| evaluateQuery(cQuery, new Object[] {cqUnfilteredEventsSet_oldValue}); |
| this.stats.endCqQueryExecution(executionStartTime); |
| } else { |
| if (isDebugEnabled) { |
| logger.debug( |
| "old value for event with key {} is null - query execution not performed", |
| eventKey); |
| } |
| } |
| } |
| } // Query oldValue |
| |
| } |
| } catch (Exception ex) { |
| // Any exception in running the query should be caught here and |
| // buried because this code is running in-line with the message |
| // processing code and we don't want to kill that thread |
| error = true; |
| // CHANGE LOG MESSAGE: |
| logger.info("Error while processing CQ on the event, key : {} CqName: {}, Error: {}", |
| new Object[] {((EntryEvent) event).getKey(), cQuery.getName(), |
| ex.getLocalizedMessage()}); |
| } |
| |
| if (error) { |
| cqEvent = MESSAGE_TYPE_EXCEPTION; |
| } else { |
| if (b_cqResults_newValue) { |
| if (b_cqResults_oldValue) { |
| cqEvent = MESSAGE_TYPE_LOCAL_UPDATE; |
| } else { |
| cqEvent = MESSAGE_TYPE_LOCAL_CREATE; |
| } |
| // If its create and caching is enabled, cache the key |
| // for this CQ. |
| cQuery.addToCqResultKeys(eventKey); |
| } else if (b_cqResults_oldValue) { |
| // Base invalidate operation is treated as destroy. |
| // When the invalidate comes through, the entry will no longer |
| // satisfy the query and will need to be deleted. |
| cqEvent = MESSAGE_TYPE_LOCAL_DESTROY; |
| // If caching is enabled, mark this event's key as removed |
| // from the CQ cache. |
| cQuery.markAsDestroyedInCqResultKeys(eventKey); |
| } |
| } |
| } |
| |
| // Get the matching CQs if any. |
| // synchronized (this.matchingCqMap){ |
| String query = cQuery.getQueryString(); |
| Set matchingCqs = (Set) matchingCqMap.get(query); |
| if (matchingCqs != null) { |
| Iterator iter = matchingCqs.iterator(); |
| while (iter.hasNext()) { |
| String matchingCqName = (String) iter.next(); |
| if (!matchingCqName.equals(cqName)) { |
| matchedCqs.put(matchingCqName, cqEvent); |
| if (isDebugEnabled) { |
| logger.debug("Adding CQ into Matching CQ Map: {} Event is: {}", matchingCqName, |
| cqEvent); |
| } |
| } |
| } |
| } |
| } |
| |
| if (cqEvent != null && cQuery.isRunning()) { |
| if (isDebugEnabled) { |
| logger.debug("Added event to CQ with client-side name: {} key: {} operation : {}", |
| cQuery.cqName, eventKey, cqEvent); |
| } |
| cqInfo.put(filterID, cqEvent); |
| CqQueryVsdStats stats = cQuery.getVsdStats(); |
| if (stats != null) { |
| stats.updateStats(cqEvent); |
| } |
| } |
| } |
| if (cqInfo.size() > 0) { |
| if (pf.isLocalProfile()) { |
| if (isDebugEnabled) { |
| logger.debug("Setting local CQ matches to {}", cqInfo); |
| } |
| frInfo.setLocalCqInfo(cqInfo); |
| } else { |
| if (isDebugEnabled) { |
| logger.debug("Setting CQ matches for {} to {}", cf.getDistributedMember(), cqInfo); |
| } |
| frInfo.setCqRoutingInfo(cf.getDistributedMember(), cqInfo); |
| } |
| } |
| } // iteration over Profiles. |
| } |
| |
| private Integer generateCqRegionEvent(CacheEvent event) { |
| Integer cqEvent = null; |
| if (event.getOperation().isRegionDestroy()) { |
| cqEvent = MessageType.DESTROY_REGION; |
| } else if (event.getOperation().isRegionInvalidate()) { |
| cqEvent = MessageType.INVALIDATE_REGION; |
| } else if (event.getOperation().isClear()) { |
| cqEvent = MessageType.CLEAR_REGION; |
| } |
| return cqEvent; |
| } |
| |
| /** |
| * Manages the CQs created for the base region. This is managed here, instead of on the base |
| * region; since the cq could be created on the base region, before base region is created (using |
| * newCq()). |
| */ |
| private void addToBaseRegionToCqNameMap(String regionName, String cqName) { |
| synchronized (this.baseRegionToCqNameMap) { |
| ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName); |
| if (cqs == null) { |
| cqs = new ArrayList<>(); |
| } |
| cqs.add(cqName); |
| this.baseRegionToCqNameMap.put(regionName, cqs); |
| } |
| } |
| |
| void removeFromBaseRegionToCqNameMap(String regionName, String cqName) { |
| synchronized (this.baseRegionToCqNameMap) { |
| ArrayList<String> cqs = this.baseRegionToCqNameMap.get(regionName); |
| if (cqs != null) { |
| cqs.remove(cqName); |
| if (cqs.isEmpty()) { |
| this.baseRegionToCqNameMap.remove(regionName); |
| } else { |
| this.baseRegionToCqNameMap.put(regionName, cqs); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Get the VSD ststs for CQ Service. There is one CQ Service per cache |
| * |
| * @return reference to VSD stats object for the CQ service |
| */ |
| public CqServiceVsdStats getCqServiceVsdStats() { |
| return stats; |
| } |
| |
| /** |
| * Adds the query from the given CQ to the matched CQ map. |
| */ |
| void addToMatchingCqMap(CqQueryImpl cq) { |
| synchronized (this.matchingCqMap) { |
| String cqQuery = cq.getQueryString(); |
| Set<String> matchingCQs; |
| if (!matchingCqMap.containsKey(cqQuery)) { |
| matchingCQs = Collections.newSetFromMap(new ConcurrentHashMap()); |
| matchingCqMap.put(cqQuery, matchingCQs); |
| this.stats.incUniqueCqQuery(); |
| } else { |
| matchingCQs = (Set) matchingCqMap.get(cqQuery); |
| } |
| matchingCQs.add(cq.getServerCqName()); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Adding CQ into MatchingCQ map, CQName: {} Number of matched querys are: {}", |
| cq.getServerCqName(), matchingCQs.size()); |
| } |
| } |
| } |
| |
| /** |
| * Removes the query from the given CQ from the matched CQ map. |
| */ |
| private void removeFromMatchingCqMap(CqQueryImpl cq) { |
| synchronized (this.matchingCqMap) { |
| String cqQuery = cq.getQueryString(); |
| if (matchingCqMap.containsKey(cqQuery)) { |
| Set matchingCQs = (Set) matchingCqMap.get(cqQuery); |
| matchingCQs.remove(cq.getServerCqName()); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Removing CQ from MatchingCQ map, CQName: {} Number of matched querys are: {}", |
| cq.getServerCqName(), matchingCQs.size()); |
| } |
| if (matchingCQs.isEmpty()) { |
| matchingCqMap.remove(cqQuery); |
| this.stats.decUniqueCqQuery(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the matching CQ map. |
| * |
| * @return HashMap matchingCqMap |
| */ |
| public Map<String, HashSet<String>> getMatchingCqMap() { |
| return matchingCqMap; |
| } |
| |
| /** |
| * Applies the query on the event. This method takes care of the performance related changed done |
| * to improve the CQ-query performance. When CQ-query is executed first time, it saves the query |
| * related information in the execution context and uses that info in later executions. |
| */ |
| private boolean evaluateQuery(CqQueryImpl cQuery, Object[] event) throws Exception { |
| ExecutionContext execContext = cQuery.getQueryExecutionContext(); |
| execContext.reset(); |
| execContext.setBindArguments(event); |
| boolean status = false; |
| |
| // Check if the CQ query is executed once. |
| // If not execute the query in normal way. |
| // During this phase the query execution related info are stored in the |
| // ExecutionContext. |
| if (execContext.getScopeNum() <= 0) { |
| SelectResults results = |
| (SelectResults) ((DefaultQuery) cQuery.getQuery()).executeUsingContext(execContext); |
| if (results != null && results.size() > 0) { |
| status = true; |
| } |
| } else { |
| // Execute using the saved query info (in ExecutionContext). |
| // This avoids building resultSet, index look-up, generating build-plans |
| // that are not required for; query execution on single object. |
| CompiledSelect cs = ((DefaultQuery) (cQuery.getQuery())).getSelect(); |
| status = cs.evaluateCq(execContext); |
| } |
| return status; |
| } |
| |
| @Override |
| public UserAttributes getUserAttributes(String cqName) { |
| return this.cqNameToUserAttributesMap.get(cqName); |
| } |
| |
| @Override |
| public void cqsDisconnected(Pool pool) { |
| invokeCqsConnected(pool, false); |
| } |
| |
| @Override |
| public void cqsConnected(Pool pool) { |
| invokeCqsConnected(pool, true); |
| } |
| |
| /** |
| * Let cq listeners know that they are connected or disconnected |
| */ |
| private void invokeCqsConnected(Pool pool, boolean connected) { |
| String poolName = pool.getName(); |
| // Check to see if we are already connected/disconnected. |
| // If state has not changed, do not invoke another connected/disconnected |
| synchronized (cqPoolsConnected) { |
| // don't repeatedly send same connect/disconnect message to cq's on repeated fails of |
| // RedundancySatisfier |
| if (cqPoolsConnected.containsKey(poolName) && connected == cqPoolsConnected.get(poolName)) { |
| return; |
| } |
| cqPoolsConnected.put(poolName, connected); |
| |
| Collection<? extends InternalCqQuery> cqs = this.getAllCqs(); |
| String cqName = null; |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| for (InternalCqQuery query : cqs) { |
| try { |
| if (query == null) { |
| continue; |
| } |
| |
| cqName = query.getName(); |
| ClientCQImpl cQuery = (ClientCQImpl) this.getCq(cqName); |
| |
| // Check cq pool to determine if the pool matches, if not continue. |
| // Also if the connected state is already the same, we do not have to send status again. |
| if (cQuery == null || cQuery.getCQProxy() == null) { |
| continue; |
| } |
| Pool cqPool = cQuery.getCQProxy().getPool(); |
| if (cQuery.isConnected() == connected || !cqPool.getName().equals(poolName)) { |
| continue; |
| } |
| |
| if ((!cQuery.isRunning() && cQuery.getQueuedEvents() == null)) { |
| if (isDebugEnabled) { |
| logger.debug("Unable to invoke CqListener, CQ is Not running, CqName : {}", cqName); |
| } |
| continue; |
| } |
| |
| this.invokeCqConnectedListeners(cqName, cQuery, connected); |
| } catch (VirtualMachineError e) { |
| SystemFailure.initiateFailure(e); |
| throw e; |
| } catch (Throwable t) { |
| SystemFailure.checkFailure(); |
| logger.warn("Error while sending connection status to cq listeners", t); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public List<String> getAllDurableCqsFromServer(InternalPool pool) { |
| return new ServerCQProxyImpl(pool).getAllDurableCqsFromServer(); |
| } |
| } |