blob: b9e0ddfaf7fbb661284b3df82392e25c00ada41e [file] [log] [blame]
/*=========================================================================
* Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
* This product is protected by U.S. and international copyright
* and intellectual property laws. Pivotal products are covered by
* one or more patents listed at http://www.pivotal.io/patents.
*=========================================================================
*/
package com.gemstone.gemfire.internal.cache;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.logging.log4j.Logger;
import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.DataSerializer;
import com.gemstone.gemfire.InternalGemFireError;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.cache.CacheEvent;
import com.gemstone.gemfire.cache.EntryEvent;
import com.gemstone.gemfire.cache.Operation;
import com.gemstone.gemfire.cache.Region;
import com.gemstone.gemfire.cache.SerializedCacheValue;
import com.gemstone.gemfire.cache.query.internal.CqStateImpl;
import com.gemstone.gemfire.cache.query.internal.cq.CqService;
import com.gemstone.gemfire.cache.query.internal.cq.CqServiceProvider;
import com.gemstone.gemfire.cache.query.internal.cq.ServerCQ;
import com.gemstone.gemfire.distributed.DistributedMember;
import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile;
import com.gemstone.gemfire.distributed.internal.DistributionManager;
import com.gemstone.gemfire.distributed.internal.HighPriorityDistributionMessage;
import com.gemstone.gemfire.distributed.internal.MessageWithReply;
import com.gemstone.gemfire.distributed.internal.ReplyMessage;
import com.gemstone.gemfire.distributed.internal.ReplyProcessor21;
import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
import com.gemstone.gemfire.internal.ClassLoadUtil;
import com.gemstone.gemfire.internal.DataSerializableFixedID;
import com.gemstone.gemfire.internal.InternalDataSerializer;
import com.gemstone.gemfire.internal.Version;
import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor.CacheProfile;
import com.gemstone.gemfire.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import com.gemstone.gemfire.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData;
import com.gemstone.gemfire.internal.cache.FilterRoutingInfo.FilterInfo;
import com.gemstone.gemfire.internal.cache.tier.InterestType;
import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier;
import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
import com.gemstone.gemfire.internal.cache.tier.sockets.UnregisterAllInterest;
import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
import com.gemstone.gemfire.internal.logging.LogService;
import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage;
import com.gemstone.gemfire.internal.logging.log4j.LogMarker;
/**
* FilterProfile represents a distributed system member and is used for
* two purposes: processing client-bound events, and providing information
* for profile exchanges.
*
* FilterProfiles represent client IDs, including durable Queue IDs, with
* long integers. This reduces the size of routing information when sent
* over the network.
*
* @since 6.5
* @author bruce
*/
public class FilterProfile implements DataSerializableFixedID {
private static final Logger logger = LogService.getLogger();
/** enumeration of distributed profile operations */
static enum operationType {
REGISTER_KEY, REGISTER_KEYS, REGISTER_PATTERN, REGISTER_FILTER,
UNREGISTER_KEY, UNREGISTER_KEYS, UNREGISTER_PATTERN, UNREGISTER_FILTER,
CLEAR,
HAS_CQ, REGISTER_CQ, CLOSE_CQ, STOP_CQ, SET_CQ_STATE
}
/**
* these booleans tell whether the associated operationType pertains to CQs
* or not
*/
static boolean[] isCQOperation = {
false, false, false, false,
false, false, false, false,
false,
true, true, true, true, true
};
/**
* types of interest a client may have<br>
* NONE - not interested<br>
* UPDATES - wants update messages<br>
* INVALIDATES - wants invalidations instead of updates
*/
public static enum interestType {
NONE, UPDATES, INVALIDATES
}
/**
* this variable is used to ensure that the state of all of the interest
* variables is consistent with other threads. See
* http://www.cs.umd.edu/~pugh/java/memoryModel/jsr-133-faq.html#volatile
*/
@SuppressWarnings("unused")
private volatile Object volatileBarrier = null;
/**
* The keys in which clients are interested. This is a map keyed on client id,
* with a HashSet of the interested keys as the values.
*
* This map is never modified in place. Updaters must synchronize via
* {@link #interestListLock}.
*/
private Map<Object, Set> keysOfInterest;
private Map<Object, Set> keysOfInterestInv;
/**
* The patterns in which clients are interested. This is a map keyed on
* client id, with a HashMap (key name to compiled pattern) as the values.
*
* This map is never modified in place. Updaters must synchronize via
* {@link #interestListLock}.
*/
private Map<Object, Map<Object, Pattern>> patternsOfInterest;
private Map<Object, Map<Object, Pattern>> patternsOfInterestInv;
/**
* The filtering classes in which clients are interested. This is a map
* keyed on client id, with a HashMap (key name to {@link InterestFilter})
* as the values.
*
* This map is never modified in place. Updaters must synchronize via
* {@link #interestListLock}.
*/
private Map<Object, Map> filtersOfInterest;
private Map<Object, Map> filtersOfInterestInv;
/**
* Set of clients that we have ALL_KEYS interest for and who want updates
*/
private Set<Long> allKeyClients = null;
/**
* Set of clients that we have ALL_KEYS interest for and who want invalidations
*/
private Set<Long> allKeyClientsInv = null;
/**
* An object used for synchronizing the interest lists
*/
private transient final Object interestListLock = new Object();
/**
* The region associated with this profile
*/
transient LocalRegion region;
/**
* Whether this is a local profile or one that describes another process
*/
private transient boolean isLocalProfile;
/** Currently installed CQ count on the region */
AtomicInteger cqCount;
/** CQs that are registered on the remote node **/
private volatile Map cqs = Collections.EMPTY_MAP;
/* the ID of the member that this profile describes */
private DistributedMember memberID;
/**
* since client identifiers can be long, we use a mapping for on-wire
* operations
*/
IDMap clientMap;
/**
* since CQ identifiers can be long, we use a mapping for on-wire operations
*/
IDMap cqMap;
/**
* Queues the Filter Profile messages that are received during profile
* exchange.
*/
private volatile Map<InternalDistributedMember, LinkedList<OperationMessage>> filterProfileMsgQueue = new HashMap();
public FilterProfile() {
cqCount = new AtomicInteger();
}
/**
* used for instantiation of a profile associated with a region and
* not describing region filters in a different process. Do not use
* this method when instantiating profiles to store in distribution
* advisor profiles.
* @param r
*/
public FilterProfile(LocalRegion r) {
this.region = r;
this.isLocalProfile = true;
this.memberID = region.getMyId();
this.cqCount = new AtomicInteger();
this.clientMap = new IDMap();
this.cqMap = new IDMap();
this.localProfile.hasCacheServer = (r.getGemFireCache().getCacheServers().size() > 0);
}
public static boolean isCqOp(operationType opType){
return isCQOperation[opType.ordinal()];
}
/** return a set containing all clients that have registered interest for values */
private Set getAllClientsWithInterest() {
Set clientsWithInterest = new HashSet(getAllKeyClients());
clientsWithInterest.addAll(getPatternsOfInterest().keySet());
clientsWithInterest.addAll(getFiltersOfInterest().keySet());
clientsWithInterest.addAll(getKeysOfInterest().keySet());
return clientsWithInterest;
}
/** return a set containing all clients that have registered interest for invalidations */
private Set getAllClientsWithInterestInv() {
Set clientsWithInterestInv = new HashSet(getAllKeyClientsInv());
clientsWithInterestInv.addAll(getPatternsOfInterestInv().keySet());
clientsWithInterestInv.addAll(getFiltersOfInterestInv().keySet());
clientsWithInterestInv.addAll(getKeysOfInterestInv().keySet());
return clientsWithInterestInv;
}
/**
* Registers interest in the input region name and key
*
* @param inputClientID
* The identity of the interested client
* @param interest
* The key in which to register interest
* @param typeOfInterest the type of interest the client is registering
* @param updatesAsInvalidates
* whether the client just wants invalidations
* @return a set of the keys that were registered, which may be null
*/
public Set registerClientInterest(Object inputClientID,
Object interest, int typeOfInterest, boolean updatesAsInvalidates) {
Set keysRegistered = null;
operationType opType = null;
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
synchronized(this.interestListLock) {
switch (typeOfInterest) {
case InterestType.KEY: {
opType = operationType.REGISTER_KEY;
Set oldInterestList;
Map<Object, Set> koi = updatesAsInvalidates?
this.getKeysOfInterestInv() : this.getKeysOfInterest();
oldInterestList = koi.get(clientID);
Set newInterestList = (oldInterestList == null)?
new HashSet() : new HashSet(oldInterestList);
newInterestList.add(interest);
Map<Object,Set> newMap = new HashMap(koi);
newMap.put(clientID, newInterestList);
if (updatesAsInvalidates) {
this.setKeysOfInterestInv(newMap);
} else {
this.setKeysOfInterest(newMap);
}
// Create a set of keys to pass to any listeners.
// There currently is no check to see if the key already exists.
keysRegistered = new HashSet();
keysRegistered.add(interest);
break;
}
case InterestType.REGULAR_EXPRESSION: {
keysRegistered = new HashSet();
opType = operationType.REGISTER_PATTERN;
if (((String)interest).equals(".*")) {
// ALL_KEYS
Set akc = updatesAsInvalidates? this.getAllKeyClientsInv() : this.getAllKeyClients();
akc = new HashSet(akc);
if (akc.add(clientID)) {
keysRegistered.add(interest);
if (updatesAsInvalidates) {
this.setAllKeyClientsInv(akc);
} else {
this.setAllKeyClients(akc);
}
}
} else {
Pattern pattern = Pattern.compile((String) interest);
Map<Object, Map<Object, Pattern>> pats = updatesAsInvalidates?
this.getPatternsOfInterestInv() : this.getPatternsOfInterest();
Map<Object, Pattern> oldInterestMap = pats.get(clientID);
Map<Object, Pattern> newInterestMap =(oldInterestMap == null)?
new HashMap() : new HashMap(oldInterestMap);
Pattern oldPattern = newInterestMap.put(interest, pattern);
if (oldPattern == null) {
// If the pattern didn't exist, add it to the set of keys to pass to any listeners.
keysRegistered.add(interest);
}
Map<Object, Map<Object, Pattern>> newMap = new HashMap(pats);
newMap.put(clientID, newInterestMap);
if(updatesAsInvalidates) {
this.setPatternsOfInterestInv(newMap);
} else {
this.setPatternsOfInterest(newMap);
}
}
break;
}
case InterestType.FILTER_CLASS: {
// get instance of the filter
Class filterClass;
InterestFilter filter;
try {
filterClass = ClassLoadUtil.classFromName((String)interest);
filter = (InterestFilter)filterClass.newInstance();
}
catch (ClassNotFoundException cnfe) {
throw new RuntimeException(LocalizedStrings.CacheClientProxy_CLASS_0_NOT_FOUND_IN_CLASSPATH.toLocalizedString(interest), cnfe);
}
catch (Exception e) {
throw new RuntimeException(LocalizedStrings.CacheClientProxy_CLASS_0_COULD_NOT_BE_INSTANTIATED.toLocalizedString(interest), e);
}
opType = operationType.REGISTER_FILTER;
Map<Object, Map>filts = updatesAsInvalidates?
this.getFiltersOfInterestInv() : this.getFiltersOfInterest();
Map oldInterestMap = filts.get(clientID);
Map newInterestMap = (oldInterestMap == null)?
new HashMap() : new HashMap(oldInterestMap);
newInterestMap.put(interest, filter);
HashMap newMap = new HashMap(filts);
newMap.put(clientID, newInterestMap);
if (updatesAsInvalidates) {
this.setFiltersOfInterestInv(newMap);
} else {
this.setFiltersOfInterest(newMap);
}
break;
}
default:
throw new InternalGemFireError(LocalizedStrings.CacheClientProxy_UNKNOWN_INTEREST_TYPE.toLocalizedString());
} // switch
if (this.isLocalProfile && opType != null) {
sendProfileOperation(clientID, opType, interest, updatesAsInvalidates);
}
} // synchronized
return keysRegistered;
}
/**
* Unregisters a client's interest
*
* @param inputClientID
* The identity of the client that is losing interest
* @param interest
* The key in which to unregister interest
* @param interestType the type of uninterest
* @return the keys unregistered, which may be null
*/
public Set unregisterClientInterest(Object inputClientID,
Object interest, int interestType) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs; // read
clientID = cids.get(inputClientID);
if (clientID == null) {
if (logger.isDebugEnabled()) {
logger.debug("region profile unable to find '{}' for unregistration. Probably means there is no durable queue.", inputClientID);
}
return null;
}
}
Set keysUnregistered = null;
operationType opType = null;
synchronized (this.interestListLock) {
switch (interestType) {
case InterestType.KEY: {
opType = operationType.UNREGISTER_KEY;
if (interest == UnregisterAllInterest.singleton()) {
clearInterestFor(inputClientID);
// Bruce: this code removed since clearInterestFor() is more comprehensive
// if (this.keysOfInterest.get(clientID) != null) {
// Map newMap = new HashMap(this.keysOfInterest);
// Set removed = (Set)newMap.remove(clientID);
// // If something is removed, create a set of keys to pass
// // to any listeners
// if (removed != null) {
// keysUnregistered = new HashSet();
// keysUnregistered.addAll(removed);
// }
// this.keysOfInterest = newMap;
// }
// if (this.keysOfInterestInv.get(clientID) != null) {
// Map newMap = new HashMap(this.keysOfInterestInv);
// Set removed = (Set)newMap.remove(clientID);
// // If something is removed, create a set of keys to pass
// // to any listeners
// if (removed != null) {
// if (keysUnregistered == null) keysUnregistered = new HashSet();
// keysUnregistered.addAll(removed);
// }
// this.keysOfInterestInv = newMap;
// }
break;
}
Set oldInterestList = this.getKeysOfInterest().get(clientID);
if (oldInterestList != null) {
Set newInterestList = new HashSet(oldInterestList);
boolean removed = newInterestList.remove(interest);
if (removed) {
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
Map newMap = new HashMap(this.getKeysOfInterest());
if (newInterestList.size() > 0) {
newMap.put(clientID, newInterestList);
} else {
newMap.remove(clientID);
}
this.setKeysOfInterest(newMap);
}
oldInterestList = this.getKeysOfInterestInv().get(clientID);
if (oldInterestList != null) {
Set newInterestList = new HashSet(oldInterestList);
boolean removed = newInterestList.remove(interest);
if (removed) {
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
Map newMap = new HashMap(this.getKeysOfInterestInv());
if (newInterestList.size() > 0) {
newMap.put(clientID, newInterestList);
} else {
newMap.remove(clientID);
}
this.setKeysOfInterestInv(newMap);
}
break;
}
case InterestType.REGULAR_EXPRESSION: {
opType = operationType.UNREGISTER_PATTERN;
if (interest == UnregisterAllInterest.singleton()) {
if (this.getPatternsOfInterest().get(clientID) != null) {
HashMap newMap = new HashMap(this.getPatternsOfInterest());
Map removed = (Map)newMap.remove(clientID);
if (removed != null) {
keysUnregistered = new HashSet();
keysUnregistered.addAll(removed.keySet());
}
this.setPatternsOfInterest(newMap);
}
if (this.getPatternsOfInterestInv().get(clientID) != null) {
HashMap newMap = new HashMap(this.getPatternsOfInterestInv());
Map removed = (Map)newMap.remove(clientID);
if (removed != null) {
if (keysUnregistered == null) keysUnregistered = new HashSet();
keysUnregistered.addAll(removed.keySet());
}
this.setPatternsOfInterestInv(newMap);
}
Set oldSet = this.getAllKeyClients();
if (!oldSet.isEmpty()) {
Set newSet;
newSet = new HashSet(oldSet);
if (newSet.remove(clientID)) {
if (newSet.isEmpty()) {
newSet = null;
}
this.setAllKeyClients(newSet);
if (keysUnregistered == null) {
// keysUnregistered won't be null if somebody has registered
// interest in a specific key, then in '.*'.
keysUnregistered = new HashSet();
}
keysUnregistered.add(".*");
}
}
oldSet = this.getAllKeyClientsInv();
if (oldSet != null) {
Set newSet;
newSet = new HashSet(oldSet);
if (newSet.remove(clientID)) {
if (newSet.isEmpty()) {
newSet = null;
}
this.setAllKeyClientsInv(newSet);
if (keysUnregistered == null) {
// keysUnregistered won't be null if somebody has registered
// interest in a specific key, then in '.*'.
keysUnregistered = new HashSet();
}
keysUnregistered.add(".*");
}
}
}
else if (((String)interest).equals(".*")) { // ALL_KEYS
Set oldSet = this.getAllKeyClients();
if (oldSet != null) {
Set newSet;
newSet = new HashSet(oldSet);
if (newSet.remove(clientID)) {
if (newSet.isEmpty()) {
newSet = null;
}
this.setAllKeyClients(newSet);
// Since something was removed, create a set of keys to pass to any
// listeners
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
}
oldSet = this.getAllKeyClientsInv();
if (oldSet != null) {
Set newSet;
newSet = new HashSet(oldSet);
if (newSet.remove(clientID)) {
if (newSet.isEmpty()) {
newSet = null;
}
this.setAllKeyClientsInv(newSet);
// Since something was removed, create a set of keys to pass to any
// listeners
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
}
}
else {
Map oldInterestMap = this.getPatternsOfInterest().get(clientID);
if (oldInterestMap != null) {
Map newInterestMap = new HashMap(oldInterestMap);
Object obj = newInterestMap.remove(interest);
if (obj != null) {
// Since something was removed, create a set of keys to pass to any
// listeners
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
Map newMap = new HashMap(this.getPatternsOfInterest());
if (newInterestMap.size() > 0)
newMap.put(clientID, newInterestMap);
else
newMap.remove(clientID);
this.setPatternsOfInterest(newMap);
}
oldInterestMap = this.getPatternsOfInterestInv().get(clientID);
if (oldInterestMap != null) {
Map newInterestMap = new HashMap(oldInterestMap);
Object obj = newInterestMap.remove(interest);
if (obj != null) {
// Since something was removed, create a set of keys to pass to any
// listeners
keysUnregistered = new HashSet();
keysUnregistered.add(interest);
}
Map newMap = new HashMap(this.getPatternsOfInterestInv());
if (newInterestMap.size() > 0)
newMap.put(clientID, newInterestMap);
else
newMap.remove(clientID);
this.setPatternsOfInterestInv(newMap);
}
}
break;
}
case InterestType.FILTER_CLASS: {
opType = operationType.UNREGISTER_FILTER;
if (interest == UnregisterAllInterest.singleton()) {
if (this.getFiltersOfInterest().get(clientID) != null) {
Map newMap = new HashMap(this.getFiltersOfInterest());
newMap.remove(clientID);
this.setFiltersOfInterest(newMap);
}
if (this.getFiltersOfInterestInv().get(clientID) != null) {
Map newMap = new HashMap(this.getFiltersOfInterestInv());
newMap.remove(clientID);
this.setFiltersOfInterestInv(newMap);
}
break;
}
Map oldInterestMap = this.getFiltersOfInterest().get(clientID);
if (oldInterestMap != null) {
Map newInterestMap = new HashMap(oldInterestMap);
newInterestMap.remove(interest);
Map newMap = new HashMap(this.getFiltersOfInterest());
if (newInterestMap.size() > 0) {
newMap.put(clientID, newInterestMap);
} else {
newMap.remove(clientID);
}
this.setFiltersOfInterest(newMap);
}
oldInterestMap = this.getFiltersOfInterestInv().get(clientID);
if (oldInterestMap != null) {
Map newInterestMap = new HashMap(oldInterestMap);
newInterestMap.remove(interest);
Map newMap = new HashMap(this.getFiltersOfInterestInv());
if (newInterestMap.size() > 0) {
newMap.put(clientID, newInterestMap);
} else {
newMap.remove(clientID);
}
this.setFiltersOfInterestInv(newMap);
}
break;
}
default:
throw new InternalGemFireError(LocalizedStrings.CacheClientProxy_BAD_INTEREST_TYPE.toLocalizedString());
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, opType, interest, false);
}
} // synchronized
return keysUnregistered;
}
/**
* Registers interest in a set of keys for a client
*
* @param inputClientID
* @param keys
* The list of keys in which to register interest
* @param updatesAsInvalidates whether to send invalidations instead of updates
* @return the registered keys
*/
public Set registerClientInterestList(Object inputClientID,
List keys, boolean updatesAsInvalidates) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
Set keysRegistered = new HashSet();
synchronized (interestListLock) {
Map<Object, Set> koi = updatesAsInvalidates?
this.getKeysOfInterestInv() : this.getKeysOfInterest();
Set oldInterestList = koi.get(clientID);
Set newInterestList = (oldInterestList == null)?
new HashSet() : new HashSet(oldInterestList);
for (Object key: keys) {
if (newInterestList.add(key)) {
keysRegistered.add(key);
}
}
Map newMap = new HashMap(koi);
newMap.put(clientID, newInterestList);
if (updatesAsInvalidates) {
this.setKeysOfInterestInv(newMap);
} else {
this.setKeysOfInterest(newMap);
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.REGISTER_KEYS, keys, updatesAsInvalidates);
}
} // synchronized
return keysRegistered;
}
/**
* Unregisters interest in given keys for the given client
*
* @param inputClientID
* The fully-qualified name of the region in which to unregister
* interest
* @param keys
* The list of keys in which to unregister interest
* @return the unregistered keys
*/
public Set unregisterClientInterestList(Object inputClientID,
List keys) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
Set keysUnregistered = new HashSet();
synchronized (interestListLock) {
Set oldInterestList = this.getKeysOfInterest().get(clientID);
if (oldInterestList != null) {
Set newInterestList = new HashSet(oldInterestList);
for (Iterator i = keys.iterator(); i.hasNext();) {
Object keyOfInterest = i.next();
if (newInterestList.remove(keyOfInterest)) {
keysUnregistered.add(keyOfInterest);
}
}
Map newMap = new HashMap(this.getKeysOfInterest());
if (newInterestList.size() > 0)
newMap.put(clientID, newInterestList);
else
newMap.remove(clientID);
this.setKeysOfInterest(newMap);
}
oldInterestList = this.getKeysOfInterestInv().get(clientID);
if (oldInterestList != null) {
Set newInterestList = new HashSet(oldInterestList);
for (Iterator i = keys.iterator(); i.hasNext();) {
Object keyOfInterest = i.next();
if (newInterestList.remove(keyOfInterest)) {
keysUnregistered.add(keyOfInterest);
}
}
Map newMap = new HashMap(this.getKeysOfInterestInv());
if (newInterestList.size() > 0)
newMap.put(clientID, newInterestList);
else
newMap.remove(clientID);
this.setKeysOfInterestInv(newMap);
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.UNREGISTER_KEYS, keys, false);
}
} // synchronized
return keysUnregistered;
}
public Set getKeysOfInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
Set keys1 = this.getKeysOfInterest().get(clientID);
Set keys2 = this.getKeysOfInterestInv().get(clientID);
if (keys1 == null) {
if (keys2 == null) {
return null;
}
return Collections.unmodifiableSet(keys2);
} else if (keys2 == null) {
return Collections.unmodifiableSet(keys1);
} else {
Set result = new HashSet(keys1);
result.addAll(keys2);
return Collections.unmodifiableSet(result);
}
}
public Map<String, Pattern> getPatternsOfInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
Map patterns1 = this.getPatternsOfInterest().get(clientID);
Map patterns2 = this.getPatternsOfInterestInv().get(clientID);
if (patterns1 == null) {
if (patterns2 == null) {
return null;
}
return Collections.unmodifiableMap(patterns2);
} else if (patterns2 == null) {
return Collections.unmodifiableMap(patterns1);
} else {
Map<String, Pattern> result = new HashMap(patterns1);
result.putAll(patterns2);
return Collections.unmodifiableMap(result);
}
}
public boolean hasKeysOfInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
if (wantInvalidations) {
return this.getKeysOfInterestInv().containsKey(clientID);
}
return this.getKeysOfInterestInv().containsKey(clientID);
}
public boolean hasAllKeysInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
return hasAllKeysInterestFor(clientID, false) ||
hasAllKeysInterestFor(clientID, true);
}
public boolean hasAllKeysInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
if (wantInvalidations) {
volatileBarrier();
return getAllKeyClientsInv().contains(clientID);
}
return getAllKeyClients().contains(clientID);
}
public boolean hasRegexInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
if (wantInvalidations) {
return (this.getPatternsOfInterestInv().containsKey(clientID));
}
return (this.getPatternsOfInterest().containsKey(clientID));
}
public boolean hasFilterInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
volatileBarrier();
if (wantInvalidations) {
return this.getFiltersOfInterestInv().containsKey(clientID);
}
return this.getFiltersOfInterestInv().containsKey(clientID);
}
/** determines whether there is any remaining interest for the given identifier */
public boolean hasInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs; // read
clientID = cids.get(inputClientID);
if (clientID == null) {
return false;
}
}
return
this.hasAllKeysInterestFor(clientID, true) ||
this.hasKeysOfInterestFor(clientID, true) ||
this.hasRegexInterestFor(clientID, true) ||
this.hasFilterInterestFor(clientID, true) ||
this.hasKeysOfInterestFor(clientID, false) ||
this.hasRegexInterestFor(clientID, false) ||
this.hasAllKeysInterestFor(clientID, false) ||
this.hasFilterInterestFor(clientID, false);
}
/*
* Returns whether this interest list has any keys, patterns or filters of
* interest. It answers the question: Are any clients being notified because
* of this interest list? @return whether this interest list has any keys,
* patterns or filters of interest
*/
public boolean hasInterest() {
return
(!this.getAllKeyClients().isEmpty()) ||
(!this.getAllKeyClientsInv().isEmpty()) ||
(!this.getKeysOfInterest().isEmpty()) ||
(!this.getPatternsOfInterest().isEmpty()) ||
(!this.getFiltersOfInterest().isEmpty()) ||
(!this.getKeysOfInterestInv().isEmpty()) ||
(!this.getPatternsOfInterestInv().isEmpty()) ||
(!this.getFiltersOfInterestInv().isEmpty());
}
/** removes all interest for the given identifier */
public void clearInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long)inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs;
clientID = cids.get(inputClientID);
if (clientID == null) {
// haven't seen this client yet
return;
}
}
synchronized (interestListLock) {
{
Set akc = this.getAllKeyClients();
if (akc.contains(clientID)) {
akc = new HashSet(akc);
akc.remove(clientID);
this.setAllKeyClients(akc);
}
}
{
Set akci = this.getAllKeyClientsInv();
if (akci.contains(clientID)) {
akci = new HashSet(akci);
akci.remove(clientID);
this.setAllKeyClientsInv(akci);
}
}
{
Map<Object, Set> keys = this.getKeysOfInterest();
if (keys.containsKey(clientID)) {
Map newkeys = new HashMap(keys);
newkeys.remove(clientID);
this.setKeysOfInterest(newkeys);
}
}
{
Map<Object, Set> keys = this.getKeysOfInterestInv();
if (keys.containsKey(clientID)) {
Map newkeys = new HashMap(keys);
newkeys.remove(clientID);
this.setKeysOfInterestInv(newkeys);
}
}
{
Map<Object, Map<Object, Pattern>> pats = this.getPatternsOfInterest();
if (pats.containsKey(clientID)) {
Map newpats = new HashMap(pats);
newpats.remove(clientID);
this.setPatternsOfInterest(newpats);
}
}
{
Map<Object, Map<Object, Pattern>> pats = this.getPatternsOfInterestInv();
if (pats.containsKey(clientID)) {
Map newpats = new HashMap(pats);
newpats.remove(clientID);
this.setPatternsOfInterestInv(newpats);
}
}
{
Map<Object, Map> filters = this.getFiltersOfInterest();
if (filters.containsKey(clientID)) {
Map newfilts = new HashMap(filters);
newfilts.remove(clientID);
this.setFiltersOfInterest(newfilts);
}
}
{
Map<Object, Map> filters = this.getFiltersOfInterestInv();
if (filters.containsKey(clientID)) {
Map newfilts = new HashMap(filters);
newfilts.remove(clientID);
this.setFiltersOfInterestInv(newfilts);
}
}
if (clientMap != null) {
clientMap.removeIDMapping(clientID);
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.CLEAR, null, false);
}
}
}
/**
* Obtains the number of CQs registered on the region.
* Assumption: CQs are not duplicated among clients.
*
* @return int currently registered CQs of the region
*/
public int getCqCount() {
return this.cqCount.get();
}
public void incCqCount(){
this.cqCount.getAndIncrement();
}
public void decCqCount(){
this.cqCount.decrementAndGet();
}
/**
* Returns the CQs registered on this region.
*/
public Map getCqMap() {
return this.cqs;
}
/**
* does this profile contain any continuous queries?
*/
public boolean hasCQs() {
return this.cqCount.get() > 0;
}
public ServerCQ getCq(String cqName) {
return (ServerCQ)this.cqs.get(cqName);
}
public void registerCq(ServerCQ cq) {
ensureCqID(cq);
if (logger.isDebugEnabled()) {
logger.debug("Adding CQ {} to this members FilterProfile.", cq.getServerCqName());
}
Map newCqs = new HashMap(this.cqs);
newCqs.put(cq.getServerCqName(), cq);
this.cqs = newCqs;
this.incCqCount();
//cq.setFilterID(cqMap.getWireID(cq.getServerCqName()));
this.sendCQProfileOperation(operationType.REGISTER_CQ, cq);
}
public void stopCq(ServerCQ cq) {
ensureCqID(cq);
if (logger.isDebugEnabled()) {
this.logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
}
this.sendCQProfileOperation(operationType.STOP_CQ, cq);
}
public String generateCqName(String serverCqName) {
// Set unique CQ Name with respect to the senders filterProfile.
// To avoid any conflict with the CQ names while maintained in
// the CqService.
// The CQ will be identified in the remote node using its
// serverCqName.
return (serverCqName + this.hashCode());
}
/**
* adds a new CQ to this profile during a delta operation or deserialization
* @param serverCqName the query objects' name
* @param ServerCQ the new query object
* @param addToCqMap whether to add the query to this.cqs
*/
void processRegisterCq(String serverCqName, ServerCQ ServerCQ,
boolean addToCqMap) {
ServerCQ cq = (ServerCQ)ServerCQ;
try {
CqService cqService = GemFireCacheImpl.getInstance().getCqService();
cqService.start();
cq.setCqService(cqService);
CqStateImpl cqState = (CqStateImpl)cq.getState();
cq.setName(generateCqName(serverCqName));
cq.registerCq(null, null, cqState.getState());
} catch (Exception ex){
// Change it to Info level.
if (logger.isDebugEnabled()) {
logger.debug("Error while initializing the CQs with FilterProfile for CQ {}, Error : {}", serverCqName, ex.getMessage(), ex);
}
}
if (logger.isDebugEnabled()) {
logger.debug("Adding CQ to remote members FilterProfile using name: {}", serverCqName);
}
if (addToCqMap) {
Map newCqs = new HashMap(this.cqs);
newCqs.put(serverCqName, cq);
this.cqs = newCqs;
}
// The region's FilterProfile is accessed through CQ reference as the
// region is not set on the FilterProfile created for the peer nodes.
if (cq.getCqBaseRegion() != null ) {
FilterProfile pf = cq.getCqBaseRegion().getFilterProfile();
if (pf != null) {
pf.incCqCount();
}
}
}
public void processCloseCq(String serverCqName) {
ServerCQ cq = (ServerCQ)this.cqs.get(serverCqName);
if (cq != null){
try {
cq.close(false);
} catch (Exception ex){
if (logger.isDebugEnabled()) {
logger.debug("Unable to close the CQ with the filterProfile, on region {} for CQ {}, Error : {}", this.region.getFullPath(),
serverCqName, ex.getMessage(), ex);
}
}
Map newCqs = new HashMap(cqs);
newCqs.remove(serverCqName);
this.cqs = newCqs;
cq.getCqBaseRegion().getFilterProfile().decCqCount();
}
}
public void processSetCqState(String serverCqName, ServerCQ ServerCQ) {
ServerCQ cq = (ServerCQ)this.cqs.get(serverCqName);
if (cq != null){
CqStateImpl cqState = (CqStateImpl)ServerCQ.getState();
cq.setCqState(cqState.getState());
}
}
public void processStopCq(String serverCqName) {
ServerCQ cq = (ServerCQ)this.cqs.get(serverCqName);
if (cq != null){
try {
cq.stop();
} catch (Exception ex){
if (logger.isDebugEnabled()) {
logger.debug("Unable to stop the CQ with the filterProfile, on region {} for CQ {}, Error : {}",
this.region.getFullPath(), serverCqName, ex.getMessage(), ex);
}
}
}
}
public void setCqState(ServerCQ cq) {
// This could be when CQ is stopped and restarted.
ensureCqID(cq);
if (logger.isDebugEnabled()) {
logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
}
this.sendCQProfileOperation(operationType.SET_CQ_STATE, cq);
}
public void closeCq(ServerCQ cq) {
ensureCqID(cq);
String serverCqName = cq.getServerCqName();
Map newCqs = new HashMap(this.cqs);
newCqs.remove(serverCqName);
this.cqs = newCqs;
if (this.cqMap != null) {
this.cqMap.removeIDMapping(cq.getFilterID());
}
this.decCqCount();
this.sendCQProfileOperation(operationType.CLOSE_CQ, cq);
}
void cleanupForClient(CacheClientNotifier ccn,
ClientProxyMembershipID client) {
Iterator cqIter = this.cqs.entrySet().iterator();
while (cqIter.hasNext()) {
Map.Entry cqEntry = (Map.Entry)cqIter.next();
ServerCQ cq = (ServerCQ)cqEntry.getValue();
ClientProxyMembershipID clientId = cq.getClientProxyId();
if (clientId.equals(client)){
try {
cq.close(false);
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to remove CQ from the base region. CqName : {}", cq.getName());
}
}
this.closeCq(cq);
}
}
}
/**
* this will be get called when we remove other server profile from
* region advisor.
*/
void cleanUp() {
Map tmpCq = this.cqs;
if(tmpCq.size() > 0) {
for(Object serverCqName: tmpCq.keySet()) {
processCloseCq((String)serverCqName);
}
}
}
/**
* Returns if old value is required for CQ processing or not.
* In order to reduce the query processing time CQ caches the
* event keys its already seen, if the key is cached than the
* old value is not required.
*/
public boolean entryRequiresOldValue(Object key) {
if (this.hasCQs()){
if (!CqServiceProvider.MAINTAIN_KEYS){
return true;
}
Iterator cqIter = this.cqs.values().iterator();
while (cqIter.hasNext()) {
ServerCQ cq = (ServerCQ)cqIter.next();
if (cq.isOldValueRequiredForQueryProcessing(key)) {
return true;
}
}
}
return false;
}
private void sendProfileOperation(Long clientID, operationType opType, Object interest,
boolean updatesAsInvalidates) {
if (this.region == null || !(this.region instanceof PartitionedRegion)) {
return;
}
OperationMessage msg = new OperationMessage();
msg.regionName = this.region.getFullPath();
msg.clientID = clientID.longValue();
msg.opType = opType;
msg.interest = interest;
msg.updatesAsInvalidates = updatesAsInvalidates;
sendFilterProfileOperation(msg);
}
private void sendFilterProfileOperation(OperationMessage msg) {
Set recipients = ((CacheDistributionAdvisee)this.region).getDistributionAdvisor()
.adviseProfileUpdate();
msg.setRecipients(recipients);
ReplyProcessor21 rp = new ReplyProcessor21(this.region.getDistributionManager(), recipients);
msg.processorId = rp.getProcessorId();
this.region.getDistributionManager().putOutgoing(msg);
try {
rp.waitForReplies();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
private void sendCQProfileOperation(operationType opType, ServerCQ cq) {
// we only need to distribute for PRs. Other regions do local filter processing
if ( ! (this.region instanceof PartitionedRegion) ) {
// note that we do not need to update requiresOldValueInEvents because
// that flag is only used during region initialization. Otherwise we
// would need to
return;
}
OperationMessage msg = new OperationMessage();
msg.regionName = this.region.getFullPath();
msg.opType = opType;
msg.cq = cq;
try {
sendFilterProfileOperation(msg);
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Error sending CQ request to peers. {}", ex.getLocalizedMessage(), ex);
}
}
}
static final Profile[] NO_PROFILES = new Profile[0];
private final CacheProfile localProfile = new CacheProfile(this);
private final Profile[] localProfileArray = new Profile[]{localProfile};
/** compute local routing information */
public FilterInfo getLocalFilterRouting(CacheEvent event) {
FilterRoutingInfo fri = getFilterRoutingInfoPart2(null, event);
if (fri != null) {
return fri.getLocalFilterInfo();
} else {
return null;
}
}
/**
* @return the local CacheProfile for this FilterProfile's Region
*/
public CacheProfile getLocalProfile() {
return this.localProfile;
}
/**
* Compute the full routing information for the given set of peers. This will
* not include local routing information from interest processing. That
* is done by getFilterRoutingInfoPart2 */
public FilterRoutingInfo getFilterRoutingInfoPart1(CacheEvent event, Profile[] peerProfiles, Set cacheOpRecipients) {
// early out if there are no cache servers in the system
boolean anyServers = false;
for (int i=0; i<peerProfiles.length; i++) {
if (((CacheProfile)peerProfiles[i]).hasCacheServer) {
anyServers = true;
break;
}
}
if (!anyServers && !this.localProfile.hasCacheServer) {
return null;
}
volatileBarrier();
FilterRoutingInfo frInfo = null;
CqService cqService = getCqService(event.getRegion());
if (cqService.isRunning()) {
frInfo = new FilterRoutingInfo();
// bug #50809 - local routing for transactional ops must be done here
// because the event isn't available later and we lose the old value for the entry
final boolean processLocalProfile = event.getOperation().isEntry() && ((EntryEventImpl)event).getTransactionId() != null;
fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
}
// Process InterestList.
// return fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
frInfo = fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
if (frInfo == null || !frInfo.hasMemberWithFilterInfo()) {
return null;
} else {
return frInfo;
}
}
/**
* get local routing information
* @param part1Info routing information for peers, if any
* @param event the event to process
* @return routing information for clients connected to this server
*/
public FilterRoutingInfo getFilterRoutingInfoPart2(FilterRoutingInfo part1Info,
CacheEvent event) {
volatileBarrier();
FilterRoutingInfo result = part1Info;
if (localProfile.hasCacheServer) {
// bug #45520 - CQ events arriving out of order causes result set
// inconsistency, so don't compute routings for events in conflict
boolean isInConflict = event.getOperation().isEntry() &&
((EntryEventImpl)event).isConcurrencyConflict();
CqService cqService = getCqService(event.getRegion());
if (!isInConflict && cqService.isRunning() && this.region != null /*&& !(
this.region.isUsedForPartitionedRegionBucket() || // partitioned region CQ
this.region instanceof PartitionedRegion)*/) { // processing is done in part 1
if (result == null) {
result = new FilterRoutingInfo();
}
if (logger.isDebugEnabled()) {
logger.debug("getting local cq matches for {}", event);
}
fillInCQRoutingInfo(event, true, NO_PROFILES, result);
}
result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.EMPTY_SET);
}
return result;
}
/**
* get continuous query routing information
* @param event the event to process
* @param peerProfiles the profiles getting this event
* @param frInfo the routing table to update
*/
private void fillInCQRoutingInfo(CacheEvent event, boolean processLocalProfile,
Profile[] peerProfiles, FilterRoutingInfo frInfo) {
CqService cqService = getCqService(event.getRegion());
if (cqService != null) {
try {
Profile local = processLocalProfile? this.localProfile : null;
cqService.processEvents(event, local, peerProfiles, frInfo);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, re-throw the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
SystemFailure.checkFailure();
logger.error(LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_EXCEPTION_OCCURRED_WHILE_PROCESSING_CQS), t);
}
}
}
private CqService getCqService(Region region) {
return ((InternalCache) region.getRegionService()).getCqService();
}
/**
* computes FilterRoutingInfo objects for each of the given events
*/
public void getLocalFilterRoutingForPutAllOp(DistributedPutAllOperation dpao, DistributedPutAllOperation.PutAllEntryData[] putAllData) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (this.region != null && this.localProfile.hasCacheServer) {
volatileBarrier();
Set clientsInv = null;
Set clients = null;
int size = putAllData.length;
CqService cqService = getCqService(dpao.getRegion());
boolean doCQs = cqService.isRunning() && this.region != null /*&& !(this.region.isUsedForPartitionedRegionBucket()
|| (this.region instanceof PartitionedRegion))*/;
for (int idx=0; idx < size; idx++) {
PutAllEntryData pEntry = putAllData[idx];
if (pEntry != null) {
final EntryEventImpl ev = dpao.getEventForPosition(idx);
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
fi = fri.getLocalFilterInfo();
}
if (isDebugEnabled) {
logger.debug("Finding locally interested clients for {}", ev);
}
if (doCQs) {
if (fri == null) {
fri = new FilterRoutingInfo();
}
fillInCQRoutingInfo(ev, true, NO_PROFILES, fri);
fi = fri.getLocalFilterInfo();
}
if (this.allKeyClientsInv != null || this.keysOfInterestInv != null
|| this.patternsOfInterestInv != null
|| this.filtersOfInterestInv != null) {
clientsInv = this.getInterestedClients(ev, this.allKeyClientsInv,
this.keysOfInterestInv, this.patternsOfInterestInv,
this.filtersOfInterestInv);
}
if (this.allKeyClients != null || this.keysOfInterest != null
|| this.patternsOfInterest != null
|| this.filtersOfInterest != null) {
clients = this.getInterestedClients(ev, this.allKeyClients,
this.keysOfInterest, this.patternsOfInterest,
this.filtersOfInterest);
}
if (clients != null || clientsInv != null) {
if (fi == null) {
fi = new FilterInfo();
// no need to create or update a FilterRoutingInfo at this time
}
fi.setInterestedClients(clients);
fi.setInterestedClientsInv(clientsInv);
}
ev.setLocalFilterInfo(fi);
}
}
}
}
/**
* computes FilterRoutingInfo objects for each of the given events
*/
public void getLocalFilterRoutingForRemoveAllOp(DistributedRemoveAllOperation op, RemoveAllEntryData[] removeAllData) {
if (this.region != null && this.localProfile.hasCacheServer) {
volatileBarrier();
Set clientsInv = null;
Set clients = null;
int size = removeAllData.length;
CqService cqService = getCqService(op.getRegion());
boolean doCQs = cqService.isRunning() && this.region != null;
for (int idx=0; idx < size; idx++) {
RemoveAllEntryData pEntry = removeAllData[idx];
if (pEntry != null) {
final EntryEventImpl ev = op.getEventForPosition(idx);
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
fi = fri.getLocalFilterInfo();
}
if (logger.isDebugEnabled()) {
logger.debug("Finding locally interested clients for {}", ev);
}
if (doCQs) {
if (fri == null) {
fri = new FilterRoutingInfo();
}
fillInCQRoutingInfo(ev, true, NO_PROFILES, fri);
fi = fri.getLocalFilterInfo();
}
if (this.allKeyClientsInv != null || this.keysOfInterestInv != null
|| this.patternsOfInterestInv != null
|| this.filtersOfInterestInv != null) {
clientsInv = this.getInterestedClients(ev, this.allKeyClientsInv,
this.keysOfInterestInv, this.patternsOfInterestInv,
this.filtersOfInterestInv);
}
if (this.allKeyClients != null || this.keysOfInterest != null
|| this.patternsOfInterest != null
|| this.filtersOfInterest != null) {
clients = this.getInterestedClients(ev, this.allKeyClients,
this.keysOfInterest, this.patternsOfInterest,
this.filtersOfInterest);
}
if (clients != null || clientsInv != null) {
if (fi == null) {
fi = new FilterInfo();
// no need to create or update a FilterRoutingInfo at this time
}
fi.setInterestedClients(clients);
fi.setInterestedClientsInv(clientsInv);
}
// if (this.logger.fineEnabled()) {
// this.region.getLogWriterI18n().fine("setting event routing to " + fi);
// }
ev.setLocalFilterInfo(fi);
}
}
}
}
/**
* Fills in the routing information for clients that have registered
* interest in the given event. The routing information is stored in
* the given FilterRoutingInfo object for use in message delivery.
* @param event the event being applied to the cache
* @param profiles the profiles of members having the affected region
* @param filterRoutingInfo the routing object that is modified by this method (may be null)
* @param cacheOpRecipients members that will receive a CacheDistributionMessage for the event
* @return the resulting FilterRoutingInfo
*/
public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles,
FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) {
Set clientsInv = Collections.EMPTY_SET;
Set clients = Collections.EMPTY_SET;
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
logger.trace(LogMarker.BRIDGE_SERVER, "finding interested clients for {}", event);
}
FilterRoutingInfo frInfo = filterRoutingInfo;
for (int i=0; i < profiles.length; i++) {
CacheProfile cf = (CacheProfile)profiles[i];
if (!cf.hasCacheServer) {
continue;
}
FilterProfile pf = cf.filterProfile;
if (pf == null) {
continue;
}
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
logger.trace(LogMarker.BRIDGE_SERVER, "Processing {}", pf);
}
if (!pf.hasInterest()) {
// This block sends an empty routing table to a member that's going to
// get a CacheDistributionMessage so that if, in flight, there is an
// interest registration change the version held in the routing table
// can be used to detect the change and the receiver can recompute
// the routing.
if (!pf.isLocalProfile() && cacheOpRecipients.contains(cf.getDistributedMember())) {
if (frInfo == null) frInfo = new FilterRoutingInfo();
frInfo.addInterestedClients(cf.getDistributedMember(),
Collections.EMPTY_SET, Collections.EMPTY_SET, false);
}
continue;
}
if (event.getOperation().isEntry()) {
EntryEvent entryEvent = (EntryEvent)event;
if (pf.allKeyClientsInv != null || pf.keysOfInterestInv != null
|| pf.patternsOfInterestInv != null || pf.filtersOfInterestInv != null) {
clientsInv = pf.getInterestedClients(entryEvent, pf.allKeyClientsInv,
pf.keysOfInterestInv, pf.patternsOfInterestInv, pf.filtersOfInterestInv);
}
if (pf.allKeyClients != null || pf.keysOfInterest != null
|| pf.patternsOfInterest != null || pf.filtersOfInterest != null) {
clients = pf.getInterestedClients(entryEvent, pf.allKeyClients,
pf.keysOfInterest, pf.patternsOfInterest, pf.filtersOfInterest);
}
} else {
if (event.getOperation().isRegionDestroy() || event.getOperation().isClear()) {
clientsInv = pf.getAllClientsWithInterestInv();
clients = pf.getAllClientsWithInterest();
} else {
return frInfo;
}
}
if (pf.isLocalProfile){
if (logger.isDebugEnabled()) {
logger.debug("Setting local interested clients={} and clientsInv={}", clients, clientsInv);
}
if (frInfo == null) frInfo = new FilterRoutingInfo();
frInfo.setLocalInterestedClients(clients, clientsInv);
} else {
if (cacheOpRecipients.contains(cf.getDistributedMember()) || // always send a routing with CacheOperationMessages
(clients != null && !clients.isEmpty()) ||
(clientsInv != null && !clientsInv.isEmpty())) {
if (logger.isDebugEnabled()) {
logger.debug("Adding interested clients={} and clientsIn={} to {}", clients, clientsInv, filterRoutingInfo);
}
if (frInfo == null) frInfo = new FilterRoutingInfo();
frInfo.addInterestedClients(cf.getDistributedMember(),
clients, clientsInv, this.clientMap.hasLongID);
}
}
}
return frInfo;
}
/**
* get the clients interested in the given event that are attached to this
* server.
* @param event the entry event being applied to the cache
* @param akc allKeyClients collection
* @param koi keysOfInterest collection
* @param pats patternsOfInterest collection
* @param foi filtersOfInterest collection
* @return a set of the clients interested in the event
*/
private Set getInterestedClients(EntryEvent event,
Set akc, Map<Object, Set> koi, Map<Object, Map<Object, Pattern>> pats,
Map<Object, Map>foi) {
Set result = null;
if (akc != null) {
result = new HashSet(akc);
if (logger.isDebugEnabled()) {
logger.debug("these clients matched for all-keys: {}", akc);
}
}
if (koi != null) {
for (Iterator it=koi.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
Set keys = (Set)entry.getValue();
if (keys.contains(event.getKey())) {
Object clientID = entry.getKey();
if (result == null) result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for key list (size {})", clientID, koi.get(clientID).size());
}
}
}
}
if (pats != null && (event.getKey() instanceof String)) {
for (Iterator it=pats.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
String stringKey = (String)event.getKey();
Map<Object, Pattern> interestList = (Map<Object, Pattern>)entry.getValue();
for (Pattern keyPattern: interestList.values()) {
if (keyPattern.matcher(stringKey).matches()) {
Object clientID = entry.getKey();
if (result == null) result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for pattern ({})", clientID, pats.get(clientID));
}
break;
}
}
}
}
if (foi != null && foi.size() > 0) {
Object value;
boolean serialized;
{
SerializedCacheValue<?> serValue = event.getSerializedNewValue();
serialized = (serValue != null);
if (!serialized) {
value = event.getNewValue();
} else {
value = serValue.getSerializedValue();
}
}
InterestEvent iev = new InterestEvent(event.getKey(), value, !serialized);
Operation op = event.getOperation();
for (Iterator it=foi.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
Map<String, InterestFilter> interestList = (Map<String, InterestFilter>)entry.getValue();
for (InterestFilter filter: interestList.values()) {
if (
(op.isCreate() && filter.notifyOnCreate(iev)) ||
(op.isUpdate() && filter.notifyOnUpdate(iev)) ||
(op.isDestroy() && filter.notifyOnDestroy(iev)) ||
(op.isInvalidate() && filter.notifyOnInvalidate(iev))
) {
Object clientID = entry.getKey();
if (result == null) result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for filter ({})", clientID, getFiltersOfInterest().get(clientID));
}
break;
}
}
}
}
return result;
}
/* DataSerializableFixedID methods ---------------------------------------- */
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
InternalDistributedMember id = new InternalDistributedMember();
InternalDataSerializer.invokeFromData(id, in);
this.memberID = id;
this.allKeyClients = InternalDataSerializer.readSetOfLongs(in);
this.keysOfInterest = DataSerializer.readHashMap(in);
this.patternsOfInterest = DataSerializer.readHashMap(in);
this.filtersOfInterest = DataSerializer.readHashMap(in);
this.allKeyClientsInv = InternalDataSerializer.readSetOfLongs(in);
this.keysOfInterestInv = DataSerializer.readHashMap(in);
this.patternsOfInterestInv = DataSerializer.readHashMap(in);
this.filtersOfInterestInv = DataSerializer.readHashMap(in);
// Read CQ Info.
int numCQs = InternalDataSerializer.readArrayLength(in);
if (numCQs > 0) {
Map theCQs = new HashMap(numCQs);
int oldLevel = LocalRegion.setThreadInitLevelRequirement(LocalRegion.ANY_INIT); // do this before CacheFactory.getInstance for bug 33471
try {
for (int i=0; i < numCQs; i++){
String serverCqName = DataSerializer.readString(in);
ServerCQ cq = CqServiceProvider.readCq(in);
processRegisterCq(serverCqName, cq, false);
theCQs.put(serverCqName, cq);
}
} finally {
this.cqs = theCQs;
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
}
public int getDSFID() {
return FILTER_PROFILE;
}
public void toData(DataOutput out) throws IOException {
InternalDataSerializer.invokeToData(((InternalDistributedMember)memberID), out);
InternalDataSerializer.writeSetOfLongs(this.allKeyClients, this.clientMap.hasLongID, out);
DataSerializer.writeHashMap((HashMap)this.keysOfInterest, out);
DataSerializer.writeHashMap((HashMap)this.patternsOfInterest, out);
DataSerializer.writeHashMap((HashMap)this.filtersOfInterest, out);
InternalDataSerializer.writeSetOfLongs(this.allKeyClientsInv, this.clientMap.hasLongID, out);
DataSerializer.writeHashMap((HashMap)this.keysOfInterestInv, out);
DataSerializer.writeHashMap((HashMap)this.patternsOfInterestInv, out);
DataSerializer.writeHashMap((HashMap)this.filtersOfInterestInv, out);
// Write CQ info.
Map theCQs = this.cqs;
int size = theCQs.size();
InternalDataSerializer.writeArrayLength(size, out);
for (Iterator it=theCQs.entrySet().iterator(); it.hasNext(); ) {
Map.Entry entry = (Map.Entry)it.next();
String name = (String)entry.getKey();
ServerCQ cq = (ServerCQ)entry.getValue();
DataSerializer.writeString(name, out);
InternalDataSerializer.invokeToData(cq, out);
}
}
/**
* @return the keysOfInterest
*/
private Map<Object, Set> getKeysOfInterest() {
Map<Object,Set> keysOfInterestRef = this.keysOfInterest;
return keysOfInterestRef == null? Collections.EMPTY_MAP : keysOfInterestRef;
}
/**
* @param keysOfInterest the keysOfInterest to set
*/
private void setKeysOfInterest(Map keysOfInterest) {
this.keysOfInterest = keysOfInterest;
}
/**
* @return the keysOfInterestInv
*/
private Map<Object, Set> getKeysOfInterestInv() {
Map<Object, Set> keysOfInterestInvRef = this.keysOfInterestInv;
return keysOfInterestInvRef == null? Collections.EMPTY_MAP : keysOfInterestInvRef;
}
/**
* @param keysOfInterestInv the keysOfInterestInv to set
*/
private void setKeysOfInterestInv(Map keysOfInterestInv) {
this.keysOfInterestInv = keysOfInterestInv;
}
/**
* @return the patternsOfInterest
*/
private Map<Object, Map<Object, Pattern>> getPatternsOfInterest() {
Map<Object, Map<Object, Pattern>> patternsOfInterestRef = this.patternsOfInterest;
return patternsOfInterestRef == null? Collections.EMPTY_MAP : patternsOfInterestRef;
}
/**
* @param patternsOfInterest the patternsOfInterest to set
*/
private void setPatternsOfInterest(Map patternsOfInterest) {
this.patternsOfInterest = patternsOfInterest;
}
/**
* @return the patternsOfInterestInv
*/
private Map<Object, Map<Object, Pattern>> getPatternsOfInterestInv() {
Map<Object, Map<Object, Pattern>> patternsOfInterestInvRef = this.patternsOfInterestInv;
return patternsOfInterestInvRef == null? Collections.EMPTY_MAP : patternsOfInterestInvRef;
}
/**
* @param patternsOfInterestInv the patternsOfInterestInv to set
*/
private void setPatternsOfInterestInv(Map patternsOfInterestInv) {
this.patternsOfInterestInv = patternsOfInterestInv;
}
/**
* @return the filtersOfInterestInv
*/
Map<Object, Map> getFiltersOfInterestInv() {
Map<Object, Map> filtersOfInterestInvRef = filtersOfInterestInv;
return filtersOfInterestInvRef == null? Collections.EMPTY_MAP : filtersOfInterestInvRef;
}
/**
* @param filtersOfInterestInv the filtersOfInterestInv to set
*/
void setFiltersOfInterestInv(Map filtersOfInterestInv) {
this.filtersOfInterestInv = filtersOfInterestInv;
}
/**
* @return the filtersOfInterest
*/
private Map<Object, Map> getFiltersOfInterest() {
Map<Object, Map> filtersOfInterestRef = this.filtersOfInterest;
return filtersOfInterestRef == null? Collections.EMPTY_MAP : filtersOfInterestRef;
}
/**
* @param filtersOfInterest the filtersOfInterest to set
*/
private void setFiltersOfInterest(Map filtersOfInterest) {
this.filtersOfInterest = filtersOfInterest;
}
/**
* So far all calls to setAllKeyClients has occurred under synchronized blocks, locking on interestListLock
* @param akc the allKeyClients to set
*/
private void setAllKeyClients(Set akc) {
this.allKeyClients = akc;
if (logger.isDebugEnabled()) {
logger.debug("{}: updated allKeyClients to {}", this, akc);
}
}
/**
* perform a volatile read to ensure that all state is consistent
*/
private void volatileBarrier() {
volatileBarrier = this.cqCount; // volatile write
}
/**
* It is possible to do a get outside of a synch block, so it can change if another thread
* calls setAllKeyClients. So instead we store a ref to allKeyClients and if it changes
* we at least have the one we expected to return.
* @return the allKeyClients
*/
private Set<Object> getAllKeyClients() {
Set allKeysRef = this.allKeyClients;
if (testHook != null) {
testHook.await();
}
return allKeysRef == null? Collections.EMPTY_SET : allKeysRef;
}
public int getAllKeyClientsSize(){
return this.getAllKeyClients().size();
}
/**
* @param akc the allKeyClientsInv to set
*/
private void setAllKeyClientsInv(Set akc) {
this.allKeyClientsInv = akc;
}
/**
* It is possible to do a get outside of a synch block, so it can change if another thread
* calls setAllKeyClients. So instead we store a ref to allKeyClientsInv and if it changes
* we at least have the one we expected to return.
* @return the allKeyClientsInv
*/
private Set<Object> getAllKeyClientsInv() {
Set allKeysInvRef = this.allKeyClientsInv;
return allKeysInvRef == null? Collections.EMPTY_SET : allKeysInvRef;
}
public int getAllKeyClientsInvSize(){
return this.getAllKeyClientsInv().size();
}
@Override
public String toString() {
final boolean isDebugEnabled = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER);
return "FilterProfile(id=" + (this.isLocalProfile? "local" : this.memberID)
// + "; allKeys: " + this.allKeyClients
// + "; keys: " + this.keysOfInterest
// + "; patterns: " + this.patternsOfInterest
// + "; filters: " + this.filtersOfInterest
// + "; allKeysInv: " + this.allKeyClientsInv
// + "; keysInv: " + this.keysOfInterestInv
// + "; patternsInv: " + this.patternsOfInterestInv
// + "; filtersInv: " + this.filtersOfInterestInv
+ "; numCQs: " + ((this.cqCount == null)?0:this.cqCount.get())
+ (isDebugEnabled? ("; " + getClientMappingString()) : "")
+ (isDebugEnabled? ("; " + getCqMappingString()) : "")
+ ")";
}
/** for debugging we could sometimes use a dump of the Long->clientID table */
private String getClientMappingString() {
if (clientMap == null) {
return "";
}
Map wids = clientMap.wireIDs;
if (wids.size() == 0) {
return "clients[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
StringBuffer result = new StringBuffer(sorted.size() * 70);
result.append("clients[");
Iterator<Long> it = sorted.iterator();
for (int i=1; it.hasNext(); i++) {
Long wireID = it.next();
result.append(wireID).append("={").append(wids.get(wireID)).append('}');
if (it.hasNext()) {
result.append(", ");
}
}
result.append("]");
return result.toString();
}
/** for debugging we could sometimes use a dump of the Long->cq name table */
private String getCqMappingString() {
if (cqMap == null) {
return "";
}
Map wids = cqMap.wireIDs;
if (wids.size() == 0) {
return "cqs[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
StringBuffer result = new StringBuffer(sorted.size() * 70);
result.append("cqs[");
Iterator<Long> it = sorted.iterator();
for (int i=1; it.hasNext(); i++) {
Long wireID = it.next();
result.append(wireID).append("={").append(wids.get(wireID)).append('}');
if (it.hasNext()) {
result.append(", ");
}
}
result.append("]");
return result.toString();
}
/**
* given a collection of on-wire identifiers, this returns a set of
* the client/server identifiers for each client or durable queue
* @param integerIDs the integer ids of the clients/queues
* @return the translated identifiers
*/
public Set getRealClientIDs(Collection integerIDs) {
return clientMap.getRealIDs(integerIDs);
}
/**
* given a collection of on-wire identifiers, this returns a set of
* the CQ identifiers they correspond to
* @param integerIDs the integer ids of the clients/queues
* @return the translated identifiers
*/
public Set getRealCqIDs(Collection integerIDs) {
return cqMap.getRealIDs(integerIDs);
}
/**
* given an on-wire filter ID, find and return the corresponding cq name
* @param integerID the on-wire ID
* @return the translated id
*/
public String getRealCqID(Long integerID) {
return (String)cqMap.getRealID(integerID);
}
/**
* ensure that the given query contains a filter routing ID
*/
private void ensureCqID(ServerCQ cq) {
if (cq.getFilterID() == null) {
// on-wire IDs are assigned in the VM where the CQ was registered
assert this.isLocalProfile;
cq.setFilterID(cqMap.getWireID(cq.getServerCqName()));
}
}
/**
* @return the isLocalProfile
*/
public boolean isLocalProfile() {
return isLocalProfile;
}
/**
* Returns the filter profile messages received while members cache profile
* exchange was in progress.
* @param member whose messages are returned.
* @return filter profile messages that are queued for the member.
*/
public List getQueuedFilterProfileMsgs(InternalDistributedMember member){
synchronized (this.filterProfileMsgQueue){
if (this.filterProfileMsgQueue.containsKey(member)) {
return new LinkedList(this.filterProfileMsgQueue.get(member));
}
}
return Collections.EMPTY_LIST;
}
/**
* Removes the filter profile messages from the queue that are received
* while the members cache profile exchange was in progress.
* @param member whose messages are returned.
* @return filter profile messages that are queued for the member.
*/
public List removeQueuedFilterProfileMsgs(InternalDistributedMember member){
synchronized (this.filterProfileMsgQueue){
if (this.filterProfileMsgQueue.containsKey(member)) {
return new LinkedList(this.filterProfileMsgQueue.remove(member));
}
}
return Collections.EMPTY_LIST;
}
/**
* Adds the message to filter profile queue.
* @param member
* @param message
*/
public void addToFilterProfileQueue(InternalDistributedMember member, OperationMessage message){
if (logger.isDebugEnabled()) {
logger.debug("Adding message to filter profile queue: {} for member : {}", message, member);
}
synchronized (this.filterProfileMsgQueue){
LinkedList msgs = this.filterProfileMsgQueue.get(member);
if (msgs == null){
msgs = new LinkedList();
this.filterProfileMsgQueue.put(member, msgs);
}
msgs.add(message);
}
}
/**
* Process the filter profile messages.
* @param msgs
*/
public void processQueuedFilterProfileMsgs(List msgs){
final boolean isDebugEnabled = logger.isDebugEnabled();
if (msgs != null){
Iterator iter = msgs.iterator();
while (iter.hasNext()) {
try {
OperationMessage msg = (OperationMessage)iter.next();
if (isDebugEnabled) {
logger.debug("Processing the queued filter profile message :{}", msg);
}
msg.processRequest(this);
} catch (Exception ex){
logger.warn("Exception thrown while processing queued profile messages.", ex);
}
}
}
}
/**
* OperationMessage synchronously propagates a change in the profile to
* another member. It is a serial message so that there is no chance
* of out-of-order execution.
* @author bruce
*/
public static class OperationMessage extends HighPriorityDistributionMessage
implements MessageWithReply {
public long profileVersion;
boolean updatesAsInvalidates;
String regionName;
operationType opType;
long clientID;
Object interest;
int processorId;
ServerCQ cq;
String serverCqName;
/* (non-Javadoc)
* @see com.gemstone.gemfire.distributed.internal.DistributionMessage#process(com.gemstone.gemfire.distributed.internal.DistributionManager)
*/
@Override
protected void process(DistributionManager dm) {
try {
CacheDistributionAdvisee r = findRegion();
if (r == null) {
if (logger.isDebugEnabled()) {
logger.debug("Region not found, so ignoring filter profile update: {}", this);
}
return;
}
// we only need to record the delta if this is a partitioned region
if ( ! (r instanceof PartitionedRegion) ) {
return;
}
CacheDistributionAdvisor cda = (CacheDistributionAdvisor)r.getDistributionAdvisor();
CacheDistributionAdvisor.CacheProfile cp =
(CacheDistributionAdvisor.CacheProfile)cda.getProfile(getSender());
if (cp == null) { // PR accessors do not keep filter profiles around
if (logger.isDebugEnabled()) {
logger.debug("No cache profile to update, adding filter profile message to queue. Message :{}", this);
}
FilterProfile localFP = ((PartitionedRegion)r).getFilterProfile();
localFP.addToFilterProfileQueue(getSender(), this);
dm.getCancelCriterion().checkCancelInProgress(null);
} else {
cp.hasCacheServer = true;
FilterProfile fp = cp.filterProfile;
if (fp == null) { // PR accessors do not keep filter profiles around
if (logger.isDebugEnabled()) {
logger.debug("No filter profile to update: {}", this);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Processing the filter profile request for : {}", this);
}
processRequest(fp);
}
}
}
catch (RuntimeException e) {
logger.warn("Exception thrown while processing profile update", e);
}
finally {
ReplyMessage reply = new ReplyMessage();
reply.setProcessorId(this.processorId);
reply.setRecipient(getSender());
try {
dm.putOutgoing(reply);
} catch (CancelException e) {
// can't send a reply, so ignore the exception
}
}
}
public void processRequest(FilterProfile fp) {
switch (opType) {
case REGISTER_KEY:
fp.registerClientInterest(clientID, this.interest, InterestType.KEY, updatesAsInvalidates);
break;
case REGISTER_PATTERN:
fp.registerClientInterest(clientID, this.interest, InterestType.REGULAR_EXPRESSION, updatesAsInvalidates);
break;
case REGISTER_FILTER:
fp.registerClientInterest(clientID, this.interest, InterestType.FILTER_CLASS, updatesAsInvalidates);
break;
case REGISTER_KEYS:
fp.registerClientInterestList(clientID, (List)this.interest, updatesAsInvalidates);
break;
case UNREGISTER_KEY:
fp.unregisterClientInterest(clientID, this.interest, InterestType.KEY);
break;
case UNREGISTER_PATTERN:
fp.unregisterClientInterest(clientID, this.interest, InterestType.REGULAR_EXPRESSION);
break;
case UNREGISTER_FILTER:
fp.unregisterClientInterest(clientID, this.interest, InterestType.FILTER_CLASS);
break;
case UNREGISTER_KEYS:
fp.unregisterClientInterestList(clientID, (List)this.interest);
break;
case CLEAR:
fp.clearInterestFor(clientID);
break;
case HAS_CQ:
fp.cqCount.set(1);
break;
case REGISTER_CQ:
fp.processRegisterCq(this.serverCqName, this.cq, true);
break;
case CLOSE_CQ:
fp.processCloseCq(this.serverCqName);
break;
case STOP_CQ:
fp.processStopCq(this.serverCqName);
break;
case SET_CQ_STATE:
fp.processSetCqState(this.serverCqName, this.cq);
break;
default:
throw new IllegalArgumentException("Unknown filter profile operation type in operation: " + this);
}
}
private CacheDistributionAdvisee findRegion() {
CacheDistributionAdvisee result = null;
GemFireCacheImpl cache = null;
try {
cache = GemFireCacheImpl.getInstance();
if (cache != null) {
LocalRegion lr = cache.getRegionByPathForProcessing(regionName);
if (lr instanceof CacheDistributionAdvisee) {
result = (CacheDistributionAdvisee)lr;
}
}
} catch (CancelException e) {
// nothing to do
}
return result;
}
/* (non-Javadoc)
* @see com.gemstone.gemfire.internal.DataSerializableFixedID#getDSFID()
*/
public int getDSFID() {
return FILTER_PROFILE_UPDATE;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(this.processorId);
out.writeUTF(this.regionName);
out.writeShort(this.opType.ordinal());
out.writeBoolean(this.updatesAsInvalidates);
out.writeLong(this.profileVersion);
if (isCqOp(this.opType)) {
// For CQ info.
// Write Server CQ Name.
out.writeUTF(((ServerCQ)this.cq).getServerCqName());
if (this.opType == operationType.REGISTER_CQ ||
this.opType == operationType.SET_CQ_STATE){
InternalDataSerializer.invokeToData((ServerCQ)this.cq, out);
}
} else {
// For interest list.
out.writeLong(this.clientID);
DataSerializer.writeObject(this.interest, out);
}
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
this.processorId = in.readInt();
this.regionName = in.readUTF();
this.opType = operationType.values()[in.readShort()];
this.updatesAsInvalidates = in.readBoolean();
this.profileVersion = in.readLong();
if (isCqOp(this.opType)){
this.serverCqName = in.readUTF();
if (this.opType == operationType.REGISTER_CQ ||
this.opType == operationType.SET_CQ_STATE){
this.cq = CqServiceProvider.readCq(in);
}
} else {
this.clientID = in.readLong();
this.interest = DataSerializer.readObject(in);
}
}
@Override
public String toString() {
return this.getShortClassName() + "(processorId=" + this.processorId
+ "; region=" + this.regionName
+ "; operation=" + this.opType
+ "; clientID=" + this.clientID
+ "; profileVersion=" + this.profileVersion
+ (isCqOp(this.opType)?("; CqName="+this.serverCqName):"")
+ ")";
}
}
class IDMap {
long nextID = 1;
Map<Object, Long> realIDs = new ConcurrentHashMap<Object, Long>();
Map<Long, Object> wireIDs = new ConcurrentHashMap<Long, Object>();
boolean hasLongID;
synchronized boolean hasWireID(Object realId) {
return this.realIDs.containsKey(realId);
}
/** return the on-wire routing identifier for the given ID */
Long getWireID(Object realId) {
Long result = this.realIDs.get(realId);
if (result == null) {
synchronized(this) {
result = this.realIDs.get(realId);
if (result == null) {
if (nextID == Integer.MAX_VALUE) {
this.hasLongID = true;
}
result = Long.valueOf(nextID++);
this.realIDs.put(realId, result);
this.wireIDs.put(result, realId);
}
}
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER)) {
logger.trace(LogMarker.BRIDGE_SERVER, "Profile for {} mapped {} to {}", region.getFullPath(), realId, result);
}
}
return result;
}
/** return the client or durable queue id for the given on-wire identifier */
Object getRealID(Long wireID) {
return wireIDs.get(wireID);
}
/**
* given a collection of on-wire identifiers, this returns a set of
* the real identifiers (e.g., client IDs or durable queue IDs)
* @param integerIDs the integer ids
* @return the translated identifiers
*/
public Set getRealIDs(Collection integerIDs) {
if (integerIDs.size() == 0) {
return Collections.emptySet();
}
Set result = new HashSet(integerIDs.size());
Map<Long, Object> wids = wireIDs;
for (Object id: integerIDs) {
Object realID = wids.get(id);
if (realID != null) {
result.add(realID);
}
}
return result;
}
/**
* remove the mapping for the given internal ID
*/
void removeIDMapping(Long mappedId) {
Object clientId = this.wireIDs.remove(mappedId);
if (clientId != null) {
this.realIDs.remove(clientId);
}
}
}
/**
* Returns true if the client is interested in all keys.
* @param id client identifier.
* @return true if client is interested in all keys.
*/
public boolean isInterestedInAllKeys(Object id){
if (!clientMap.hasWireID(id)) {
return false;
}
return this.getAllKeyClients().contains(clientMap.getWireID(id));
}
/**
* Returns true if the client is interested in all keys, for which
* updates are sent as invalidates.
* @param id client identifier
* @return true if client is interested in all keys.
*/
public boolean isInterestedInAllKeysInv(Object id){
if (!clientMap.hasWireID(id)) {
return false;
}
return this.getAllKeyClientsInv().contains(clientMap.getWireID(id));
}
/**
* Returns the set of client interested keys.
* @param id client identifier
* @return client interested keys.
*/
public Set getKeysOfInterest(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
return this.getKeysOfInterest().get(clientMap.getWireID(id));
}
public int getKeysOfInterestSize(){
return this.getKeysOfInterest().size();
}
/**
* Returns the set of client interested keys for which updates are sent
* as invalidates.
* @param id client identifier
* @return client interested keys.
*/
public Set getKeysOfInterestInv(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
return this.getKeysOfInterestInv().get(clientMap.getWireID(id));
}
public int getKeysOfInterestInvSize(){
return this.getKeysOfInterestInv().size();
}
/**
* Returns the set of client interested patterns.
* @param id client identifier
* @return client interested patterns.
*/
public Set getPatternsOfInterest(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
Map patterns = this.getPatternsOfInterest().get(clientMap.getWireID(id));
if (patterns != null){
return new HashSet(patterns.keySet());
}
return null;
}
public int getPatternsOfInterestSize(){
return this.getPatternsOfInterest().size();
}
/**
* Returns the set of client interested patterns for which updates are sent
* as invalidates.
* @param id client identifier
* @return client interested patterns.
*/
public Set getPatternsOfInterestInv(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getPatternsOfInterestInv().get(clientMap.getWireID(id));
if (interests != null){
return new HashSet(interests.keySet());
}
return null;
}
public int getPatternsOfInterestInvSize(){
return this.getPatternsOfInterestInv().size();
}
/**
* Returns the set of client interested filters.
* @param id client identifier
* @return client interested filters.
*/
public Set getFiltersOfInterest(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getFiltersOfInterest().get(clientMap.getWireID(id));
if (interests != null){
return new HashSet(interests.keySet());
}
return null;
}
/**
* Returns the set of client interested filters for which updates are sent
* as invalidates.
* @param id client identifier
* @return client interested filters.
*/
public Set getFiltersOfInterestInv(Object id){
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getFiltersOfInterestInv().get(clientMap.getWireID(id));
if (interests != null){
return new HashSet(interests.keySet());
}
return null;
}
public static TestHook testHook = null;
/** Test Hook */
public interface TestHook {
public void await();
public void release();
}
@Override
public Version[] getSerializationVersions() {
// TODO Auto-generated method stub
return null;
}
}