| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.cache.query.cq.internal; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.DataSerializable; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.cache.DataPolicy; |
| import org.apache.geode.cache.EvictionAction; |
| import org.apache.geode.cache.query.CqAttributes; |
| import org.apache.geode.cache.query.CqAttributesMutator; |
| import org.apache.geode.cache.query.CqClosedException; |
| import org.apache.geode.cache.query.CqException; |
| import org.apache.geode.cache.query.CqExistsException; |
| import org.apache.geode.cache.query.CqResults; |
| import org.apache.geode.cache.query.Query; |
| import org.apache.geode.cache.query.QueryException; |
| import org.apache.geode.cache.query.RegionNotFoundException; |
| import org.apache.geode.cache.query.internal.CompiledBindArgument; |
| import org.apache.geode.cache.query.internal.CompiledIteratorDef; |
| import org.apache.geode.cache.query.internal.CompiledRegion; |
| import org.apache.geode.cache.query.internal.CompiledSelect; |
| import org.apache.geode.cache.query.internal.CqStateImpl; |
| import org.apache.geode.cache.query.internal.DefaultQuery; |
| import org.apache.geode.cache.query.internal.cq.CqServiceProvider; |
| import org.apache.geode.cache.query.internal.cq.ServerCQ; |
| import org.apache.geode.internal.cache.InternalCache; |
| import org.apache.geode.internal.cache.LocalRegion; |
| import org.apache.geode.internal.cache.Token; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier; |
| import org.apache.geode.internal.cache.tier.sockets.CacheClientProxy; |
| import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class ServerCQImpl extends CqQueryImpl implements DataSerializable, ServerCQ { |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * This holds the keys that are part of the CQ query results. Using this CQ engine can determine |
| * whether to execute query on old value from EntryEvent, which is an expensive operation. |
| * |
| * NOTE: In case of RR this map is populated and used as intended. In case of PR this map will not |
| * be populated. If executeCQ happens after update operations this map will remain empty. |
| */ |
| private volatile HashMap<Object, Object> cqResultKeys; |
| |
| /** |
| * This maintains the keys that are destroyed while the Results Cache is getting constructed. This |
| * avoids any keys that are destroyed (after query execution) but is still part of the CQs result. |
| */ |
| private HashSet<Object> destroysWhileCqResultsInProgress; |
| |
| /** |
| * To indicate if the CQ results key cache is initialized. |
| */ |
| public volatile boolean cqResultKeysInitialized = false; |
| |
| /** Boolean flag to see if the CQ is on Partitioned Region */ |
| volatile boolean isPR = false; |
| |
| private ClientProxyMembershipID clientProxyId = null; |
| |
| private CacheClientNotifier ccn = null; |
| |
| private String serverCqName; |
| |
| /** identifier assigned to this query for FilterRoutingInfos */ |
| private Long filterID; |
| |
| public ServerCQImpl(CqServiceImpl cqService, String cqName, String queryString, boolean isDurable, |
| String serverCqName) { |
| super(cqService, cqName, queryString, isDurable); |
| this.serverCqName = serverCqName; // On Client Side serverCqName and cqName will be same. |
| } |
| |
| public ServerCQImpl() { |
| // For deserialization |
| } |
| |
| @Override |
| public Long getFilterID() { |
| return this.filterID; |
| } |
| |
| @Override |
| public void setFilterID(Long filterID) { |
| this.filterID = filterID; |
| } |
| |
| @Override |
| public void setName(String cqName) { |
| this.cqName = this.serverCqName = cqName; |
| } |
| |
| @Override |
| public String getServerCqName() { |
| return this.serverCqName; |
| } |
| |
| @Override |
| public void registerCq(ClientProxyMembershipID p_clientProxyId, CacheClientNotifier p_ccn, |
| int p_cqState) throws CqException, RegionNotFoundException { |
| |
| CacheClientProxy clientProxy = null; |
| this.clientProxyId = p_clientProxyId; |
| |
| if (p_ccn != null) { |
| this.ccn = p_ccn; |
| clientProxy = p_ccn.getClientProxy(p_clientProxyId, true); |
| } |
| |
| validateCq(); |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| String msg = "%s"; |
| Throwable t = null; |
| try { |
| this.query = constructServerSideQuery(); |
| if (isDebugEnabled) { |
| logger.debug("Server side query for the cq: {} is: {}", cqName, |
| this.query.getQueryString()); |
| } |
| } catch (Exception ex) { |
| t = ex; |
| if (ex instanceof ClassNotFoundException) { |
| msg = |
| "Class not found exception. The antlr.jar or the spcified class may be missing from server side classpath. Error : %s"; |
| } else { |
| msg = "Error while parsing the query. Error : %s"; |
| } |
| } finally { |
| if (t != null) { |
| String s = String.format(msg, t); |
| if (isDebugEnabled) { |
| logger.debug(s, t); |
| } |
| throw new CqException(s); |
| } |
| } |
| |
| // Update Regions Book keeping. |
| // TODO replace getRegion() with getRegionByPathForProcessing() so this doesn't block |
| // if the region is still being initialized |
| this.cqBaseRegion = (LocalRegion) cqService.getCache().getRegion(regionName); |
| if (this.cqBaseRegion == null) { |
| throw new RegionNotFoundException( |
| String.format("Region : %s specified with cq not found. CqName: %s", |
| new Object[] {regionName, this.cqName})); |
| } |
| |
| // Make sure that the region is partitioned or |
| // replicated with distributed ack or global. |
| DataPolicy dp = this.cqBaseRegion.getDataPolicy(); |
| this.isPR = dp.withPartitioning(); |
| if (!(this.isPR || dp.withReplication())) { |
| String errMsg = null; |
| // replicated regions with eviction set to local destroy get turned into preloaded |
| if (dp.withPreloaded() && cqBaseRegion.getAttributes().getEvictionAttributes() != null |
| && cqBaseRegion.getAttributes().getEvictionAttributes().getAction() |
| .equals(EvictionAction.LOCAL_DESTROY)) { |
| errMsg = |
| String.format("CQ is not supported for replicated region: %s with eviction action: %s", |
| this.regionName, cqBaseRegion.getAttributes().getEvictionAttributes().getAction()); |
| } else { |
| errMsg = "The region " + this.regionName |
| + " specified in CQ creation is neither replicated nor partitioned; " |
| + "only replicated or partitioned regions are allowed in CQ creation."; |
| } |
| if (isDebugEnabled) { |
| logger.debug(errMsg); |
| } |
| throw new CqException(errMsg); |
| } |
| if ((dp.withReplication() && (!(cqBaseRegion.getAttributes().getScope().isDistributedAck() |
| || cqBaseRegion.getAttributes().getScope().isGlobal())))) { |
| String errMsg = "The replicated region " + this.regionName |
| + " specified in CQ creation does not have scope supported by CQ." |
| + " The CQ supported scopes are DISTRIBUTED_ACK and GLOBAL."; |
| if (isDebugEnabled) { |
| logger.debug(errMsg); |
| } |
| throw new CqException(errMsg); |
| } |
| |
| // Can be null by the time we are here |
| if (clientProxy != null) { |
| clientProxy.incCqCount(); |
| if (clientProxy.hasOneCq()) { |
| cqService.stats().incClientsWithCqs(); |
| } |
| if (isDebugEnabled) { |
| logger.debug("Added CQ to the base region: {} With key as: {}", cqBaseRegion.getFullPath(), |
| serverCqName); |
| } |
| } |
| |
| this.updateCqCreateStats(); |
| |
| // Initialize the state of CQ. |
| if (this.cqState.getState() != p_cqState) { |
| setCqState(p_cqState); |
| } |
| |
| // Register is called from both filter profile and cqService |
| // In either case, if we are trying to start/run the cq, we need to add |
| // it to other matching cqs for performance reasons |
| if (p_cqState == CqStateImpl.RUNNING) { |
| // Add to the matchedCqMap. |
| cqService.addToMatchingCqMap(this); |
| } |
| |
| // Initialize CQ results (key) cache. |
| if (CqServiceProvider.MAINTAIN_KEYS) { |
| this.cqResultKeys = new HashMap<Object, Object>(); |
| // Currently the CQ Result keys are not cached for the Partitioned |
| // Regions. Supporting this with PR needs more work like forcing |
| // query execution on primary buckets only; and handling the bucket |
| // re-balancing. Once this is added remove the check with PR region. |
| // Only the events which are seen during event processing is |
| // added to the results cache (not from the CQ Results). |
| if (this.isPR) { |
| this.setCqResultsCacheInitialized(); |
| } else { |
| this.destroysWhileCqResultsInProgress = new HashSet<Object>(); |
| } |
| } |
| |
| if (p_ccn != null) { |
| try { |
| cqService.addToCqMap(this); |
| } catch (CqExistsException cqe) { |
| // Should not happen. |
| throw new CqException(String.format("Unable to create cq %s Error : %s", |
| new Object[] {cqName, cqe.getMessage()})); |
| } |
| this.cqBaseRegion.getFilterProfile().registerCq(this); |
| } |
| } |
| |
| /** |
| * For Test use only. |
| * |
| * @return CQ Results Cache. |
| */ |
| public Set<Object> getCqResultKeyCache() { |
| if (this.cqResultKeys != null) { |
| synchronized (this.cqResultKeys) { |
| return Collections.synchronizedSet(new HashSet<Object>(this.cqResultKeys.keySet())); |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Returns parameterized query used by the server. This method replaces Region name with $1 and if |
| * type is not specified in the query, looks for type from cqattributes and appends into the |
| * query. |
| * |
| * @return String modified query. |
| */ |
| private Query constructServerSideQuery() throws QueryException { |
| InternalCache cache = cqService.getInternalCache(); |
| DefaultQuery locQuery = (DefaultQuery) cache.getLocalQueryService().newQuery(this.queryString); |
| CompiledSelect select = locQuery.getSimpleSelect(); |
| CompiledIteratorDef from = (CompiledIteratorDef) select.getIterators().get(0); |
| // WARNING: ASSUMES QUERY WAS ALREADY VALIDATED FOR PROPER "FORM" ON CLIENT; |
| // THIS VALIDATION WILL NEED TO BE DONE ON THE SERVER FOR NATIVE CLIENTS, |
| // BUT IS NOT DONE HERE FOR JAVA CLIENTS. |
| // The query was already checked on the client that the sole iterator is a |
| // CompiledRegion |
| this.regionName = ((CompiledRegion) from.getCollectionExpr()).getRegionPath(); |
| from.setCollectionExpr(new CompiledBindArgument(1)); |
| return locQuery; |
| } |
| |
| /** |
| * Returns if the passed key is part of the CQs result set. This method needs to be called once |
| * the CQ result key caching is completed (cqResultsCacheInitialized is true). |
| * |
| * @return true if key is in the Results Cache. |
| */ |
| public boolean isPartOfCqResult(Object key) { |
| // Handle events that may have been deleted, |
| // but added by result caching. |
| if (this.cqResultKeys == null) { |
| logger.warn( |
| "The CQ Result key cache is Null. This should not happen as the call to isPartOfCqResult() is based on the condition cqResultsCacheInitialized."); |
| return false; |
| } |
| |
| synchronized (this.cqResultKeys) { |
| if (this.destroysWhileCqResultsInProgress != null) { |
| // this.logger.fine("Removing keys from Destroy Cache For CQ :" + |
| // this.cqName + " Keys :" + this.destroysWhileCqResultsInProgress); |
| for (Object k : this.destroysWhileCqResultsInProgress) { |
| this.cqResultKeys.remove(k); |
| } |
| this.destroysWhileCqResultsInProgress = null; |
| } |
| return this.cqResultKeys.containsKey(key); |
| } |
| } |
| |
| @Override |
| public void addToCqResultKeys(Object key) { |
| if (!CqServiceProvider.MAINTAIN_KEYS) { |
| return; |
| } |
| |
| if (this.cqResultKeys != null) { |
| synchronized (this.cqResultKeys) { |
| this.cqResultKeys.put(key, TOKEN); |
| if (!this.cqResultKeysInitialized) { |
| // This key could be coming after add, destroy. |
| // Remove this from destroy queue. |
| if (this.destroysWhileCqResultsInProgress != null) { |
| this.destroysWhileCqResultsInProgress.remove(key); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void removeFromCqResultKeys(Object key, boolean isTokenMode) { |
| if (!CqServiceProvider.MAINTAIN_KEYS) { |
| return; |
| } |
| if (this.cqResultKeys != null) { |
| synchronized (this.cqResultKeys) { |
| if (isTokenMode && this.cqResultKeys.get(key) != Token.DESTROYED) { |
| return; |
| } |
| this.cqResultKeys.remove(key); |
| if (!this.cqResultKeysInitialized) { |
| if (this.destroysWhileCqResultsInProgress != null) { |
| this.destroysWhileCqResultsInProgress.add(key); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Marks the key as destroyed in the CQ Results key cache. |
| */ |
| void markAsDestroyedInCqResultKeys(Object key) { |
| if (!CqServiceProvider.MAINTAIN_KEYS) { |
| return; |
| } |
| |
| if (this.cqResultKeys != null) { |
| synchronized (this.cqResultKeys) { |
| this.cqResultKeys.put(key, Token.DESTROYED); |
| if (!this.cqResultKeysInitialized) { |
| // this.logger.fine("Adding key to Destroy Cache For CQ :" + |
| // this.cqName + " key :" + key); |
| if (this.destroysWhileCqResultsInProgress != null) { |
| this.destroysWhileCqResultsInProgress.add(key); |
| } |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void setCqResultsCacheInitialized() { |
| if (CqServiceProvider.MAINTAIN_KEYS) { |
| this.cqResultKeysInitialized = true; |
| } |
| } |
| |
| /** |
| * Returns the size of the CQ Result key cache. |
| * |
| * @return size of CQ Result key cache. |
| */ |
| public int getCqResultKeysSize() { |
| if (this.cqResultKeys == null) { |
| return 0; |
| } |
| synchronized (this.cqResultKeys) { |
| return this.cqResultKeys.size(); |
| } |
| } |
| |
| @Override |
| public boolean isOldValueRequiredForQueryProcessing(Object key) { |
| if (this.cqResultKeysInitialized && this.isPartOfCqResult(key)) { |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Closes the Query. On Client side, sends the cq close request to server. On Server side, takes |
| * care of repository cleanup. |
| */ |
| @Override |
| public void close() throws CqClosedException, CqException { |
| close(true); |
| } |
| |
| @Override |
| public void close(boolean sendRequestToServer) throws CqClosedException, CqException { |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("Started closing CQ CqName: {} SendRequestToServer: {}", cqName, |
| sendRequestToServer); |
| } |
| // Synchronize with stop and execute CQ commands |
| synchronized (this.cqState) { |
| // Check if the cq is already closed. |
| if (this.isClosed()) { |
| // throw new CqClosedException("CQ is already closed, CqName : " + this.cqName); |
| if (isDebugEnabled) { |
| logger.debug("CQ is already closed, CqName: {}", this.cqName); |
| } |
| return; |
| } |
| |
| int stateBeforeClosing = this.cqState.getState(); |
| this.cqState.setState(CqStateImpl.CLOSING); |
| boolean isClosed = false; |
| |
| // Cleanup the resource used by cq. |
| this.removeFromCqMap(); |
| |
| // Stat update. |
| if (stateBeforeClosing == CqStateImpl.RUNNING) { |
| cqService.stats().decCqsActive(); |
| } else if (stateBeforeClosing == CqStateImpl.STOPPED) { |
| cqService.stats().decCqsStopped(); |
| } |
| |
| // Clean-up the CQ Results Cache. |
| if (this.cqResultKeys != null) { |
| synchronized (this.cqResultKeys) { |
| this.cqResultKeys.clear(); |
| } |
| } |
| |
| // Set the state to close, and update stats |
| this.cqState.setState(CqStateImpl.CLOSED); |
| cqService.stats().incCqsClosed(); |
| cqService.stats().decCqsOnClient(); |
| if (this.stats != null) |
| this.stats.close(); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("Successfully closed the CQ. {}", cqName); |
| } |
| } |
| |
| @Override |
| public ClientProxyMembershipID getClientProxyId() { |
| return this.clientProxyId; |
| } |
| |
| /** |
| * Get CacheClientNotifier of this CqQuery. |
| * |
| */ |
| public CacheClientNotifier getCacheClientNotifier() { |
| return this.ccn; |
| } |
| |
| /** |
| * Clears the resource used by CQ. |
| */ |
| @Override |
| protected void cleanup() throws CqException { |
| // CqBaseRegion |
| try { |
| if (this.cqBaseRegion != null && !this.cqBaseRegion.isDestroyed()) { |
| this.cqBaseRegion.getFilterProfile().closeCq(this); |
| CacheClientProxy clientProxy = ccn.getClientProxy(clientProxyId); |
| clientProxy.decCqCount(); |
| if (clientProxy.hasNoCq()) { |
| cqService.stats().decClientsWithCqs(); |
| } |
| } |
| } catch (Exception ex) { |
| // May be cache is being shutdown |
| if (logger.isDebugEnabled()) { |
| logger.debug("Failed to remove CQ from the base region. CqName :{}", cqName); |
| } |
| } |
| } |
| |
| /** |
| * Stop or pause executing the query. |
| */ |
| @Override |
| public void stop() throws CqClosedException, CqException { |
| boolean isStopped = false; |
| synchronized (this.cqState) { |
| if (this.isClosed()) { |
| throw new CqClosedException( |
| String.format("CQ is closed, CqName : %s", this.cqName)); |
| } |
| |
| if (!(this.isRunning())) { |
| throw new IllegalStateException( |
| String.format("CQ is not in running state, stop CQ does not apply, CqName : %s", |
| this.cqName)); |
| } |
| |
| // Change state and stats on the client side |
| this.cqState.setState(CqStateImpl.STOPPED); |
| this.cqService.stats().incCqsStopped(); |
| this.cqService.stats().decCqsActive(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Successfully stopped the CQ. {}", cqName); |
| } |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in) throws IOException, ClassNotFoundException { |
| synchronized (cqState) { |
| this.cqState.setState(DataSerializer.readInteger(in)); |
| } |
| this.isDurable = DataSerializer.readBoolean(in); |
| this.queryString = DataSerializer.readString(in); |
| this.filterID = in.readLong(); |
| } |
| |
| @Override |
| public void toData(DataOutput out) throws IOException { |
| DataSerializer.writeInteger(this.cqState.getState(), out); |
| DataSerializer.writeBoolean(this.isDurable, out); |
| DataSerializer.writeString(this.queryString, out); |
| out.writeLong(this.filterID); |
| } |
| |
| @Override |
| public boolean isPR() { |
| return isPR; |
| } |
| |
| @Override |
| public CqAttributes getCqAttributes() { |
| throw new IllegalStateException("CQ attributes are not available on the server"); |
| } |
| |
| @Override |
| public CqAttributesMutator getCqAttributesMutator() { |
| throw new IllegalStateException("CQ attributes are not available on the server"); |
| } |
| |
| @Override |
| public <E> CqResults<E> executeWithInitialResults() |
| throws CqClosedException, RegionNotFoundException, CqException { |
| throw new IllegalStateException("Execute cannot be called on a CQ on the server"); |
| } |
| |
| @Override |
| public void execute() throws CqClosedException, RegionNotFoundException, CqException { |
| throw new IllegalStateException("Execute cannot be called on a CQ on the server"); |
| } |
| |
| } |