blob: 158f185652f714df7ed37627722423c011fb64cc [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.InternalGemFireError;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.EntryEvent;
import org.apache.geode.cache.Operation;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.SerializedCacheValue;
import org.apache.geode.cache.query.internal.CqStateImpl;
import org.apache.geode.cache.query.internal.cq.CqService;
import org.apache.geode.cache.query.internal.cq.CqServiceProvider;
import org.apache.geode.cache.query.internal.cq.ServerCQ;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionAdvisee;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.HighPriorityDistributionMessage;
import org.apache.geode.distributed.internal.MessageWithReply;
import org.apache.geode.distributed.internal.ReplyMessage;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.ClassLoadUtil;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.DistributedPutAllOperation.PutAllEntryData;
import org.apache.geode.internal.cache.DistributedRemoveAllOperation.RemoveAllEntryData;
import org.apache.geode.internal.cache.FilterRoutingInfo.FilterInfo;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.tier.InterestType;
import org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier;
import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
import org.apache.geode.internal.cache.tier.sockets.UnregisterAllInterest;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.offheap.annotations.Unretained;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.util.concurrent.CopyOnWriteHashMap;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* FilterProfile represents a distributed system member and is used for two purposes: processing
* client-bound events, and providing information for profile exchanges.
*
* FilterProfiles represent client IDs, including durable Queue IDs, with long integers. This
* reduces the size of routing information when sent over the network.
*
* @since GemFire 6.5
*/
public class FilterProfile implements DataSerializableFixedID {
private static final Logger logger = LogService.getLogger();
/** enumeration of distributed profile operations */
enum operationType {
REGISTER_KEY,
REGISTER_KEYS,
REGISTER_PATTERN,
REGISTER_FILTER,
UNREGISTER_KEY,
UNREGISTER_KEYS,
UNREGISTER_PATTERN,
UNREGISTER_FILTER,
CLEAR,
HAS_CQ,
REGISTER_CQ,
CLOSE_CQ,
STOP_CQ,
SET_CQ_STATE
}
/**
* these booleans tell whether the associated operationType pertains to CQs or not
*/
@Immutable
private static final boolean[] isCQOperation =
{false, false, false, false, false, false, false, false, false, true, true, true, true, true};
/**
* types of interest a client may have<br>
* NONE - not interested<br>
* UPDATES - wants update messages<br>
* INVALIDATES - wants invalidations instead of updates
*/
public static enum interestType {
NONE, UPDATES, INVALIDATES
}
/**
* The keys in which clients are interested. This is a map keyed on client id, with a HashSet of
* the interested keys as the values.
*/
private final CopyOnWriteHashMap<Object, Set> keysOfInterest = new CopyOnWriteHashMap<>();
private final CopyOnWriteHashMap<Object, Set> keysOfInterestInv = new CopyOnWriteHashMap<>();
/**
* The patterns in which clients are interested. This is a map keyed on client id, with a HashMap
* (key name to compiled pattern) as the values.
*/
private final CopyOnWriteHashMap<Object, Map<Object, Pattern>> patternsOfInterest =
new CopyOnWriteHashMap<>();
private final CopyOnWriteHashMap<Object, Map<Object, Pattern>> patternsOfInterestInv =
new CopyOnWriteHashMap<>();
/**
* The filtering classes in which clients are interested. This is a map keyed on client id, with a
* HashMap (key name to {@link InterestFilter}) as the values.
*/
private final CopyOnWriteHashMap<Object, Map> filtersOfInterest = new CopyOnWriteHashMap<>();
private final CopyOnWriteHashMap<Object, Map> filtersOfInterestInv = new CopyOnWriteHashMap<>();
/**
* Set of clients that we have ALL_KEYS interest for and who want updates
*/
private final CopyOnWriteHashSet<Long> allKeyClients = new CopyOnWriteHashSet<>();
/**
* Set of clients that we have ALL_KEYS interest for and who want invalidations
*/
private final CopyOnWriteHashSet<Long> allKeyClientsInv = new CopyOnWriteHashSet<>();
/**
* The region associated with this profile
*/
transient LocalRegion region;
/**
* Whether this is a local profile or one that describes another process
*/
private transient boolean isLocalProfile;
/** Currently installed CQ count on the region */
AtomicInteger cqCount;
/** CQs that are registered on the remote node **/
private final CopyOnWriteHashMap<String, ServerCQ> cqs = new CopyOnWriteHashMap<>();
/* the ID of the member that this profile describes */
private DistributedMember memberID;
/**
* since client identifiers can be long, we use a mapping for on-wire operations
*/
IDMap clientMap;
/**
* since CQ identifiers can be long, we use a mapping for on-wire operations
*/
IDMap cqMap;
private final Object interestListLock = new Object();
/**
* Queues the Filter Profile messages that are received during profile exchange.
*/
private volatile Map<InternalDistributedMember, LinkedList<OperationMessage>> filterProfileMsgQueue =
new HashMap<>();
public FilterProfile() {
cqCount = new AtomicInteger();
}
/**
* used for instantiation of a profile associated with a region and not describing region filters
* in a different process. Do not use this method when instantiating profiles to store in
* distribution advisor profiles.
*/
public FilterProfile(LocalRegion r) {
this(r, r.getMyId(), r.getGemFireCache().getCacheServers().size() > 0);
}
/**
* used for instantiation of a profile associated with a region and not describing region filters
* in a different process. Do not use this method when instantiating profiles to store in
* distribution advisor profiles.
*/
public FilterProfile(LocalRegion r, DistributedMember member, boolean hasCacheServer) {
this.region = r;
this.isLocalProfile = true;
this.memberID = member;
this.cqCount = new AtomicInteger();
this.clientMap = new IDMap();
this.cqMap = new IDMap();
this.localProfile.hasCacheServer = hasCacheServer;
}
public static boolean isCqOp(operationType opType) {
return isCQOperation[opType.ordinal()];
}
/** return a set containing all clients that have registered interest for values */
private Set getAllClientsWithInterest() {
Set clientsWithInterest = new HashSet(getAllKeyClients());
clientsWithInterest.addAll(getPatternsOfInterest().keySet());
clientsWithInterest.addAll(getFiltersOfInterest().keySet());
clientsWithInterest.addAll(getKeysOfInterest().keySet());
return clientsWithInterest;
}
/** return a set containing all clients that have registered interest for invalidations */
private Set getAllClientsWithInterestInv() {
Set clientsWithInterestInv = new HashSet(getAllKeyClientsInv());
clientsWithInterestInv.addAll(getPatternsOfInterestInv().keySet());
clientsWithInterestInv.addAll(getFiltersOfInterestInv().keySet());
clientsWithInterestInv.addAll(getKeysOfInterestInv().keySet());
return clientsWithInterestInv;
}
/**
* Registers interest in the input region name and key
*
* @param inputClientID The identity of the interested client
* @param interest The key in which to register interest
* @param typeOfInterest the type of interest the client is registering
* @param updatesAsInvalidates whether the client just wants invalidations
* @return a set of the keys that were registered, which may be null
*/
public Set registerClientInterest(Object inputClientID, Object interest, int typeOfInterest,
boolean updatesAsInvalidates) {
Set keysRegistered = new HashSet();
operationType opType = null;
Long clientID = getClientIDForMaps(inputClientID);
synchronized (this.interestListLock) {
switch (typeOfInterest) {
case InterestType.KEY:
opType = operationType.REGISTER_KEY;
Map<Object, Set> koi =
updatesAsInvalidates ? getKeysOfInterestInv() : getKeysOfInterest();
registerKeyInMap(interest, keysRegistered, clientID, koi);
break;
case InterestType.REGULAR_EXPRESSION:
opType = operationType.REGISTER_PATTERN;
if (((String) interest).equals(".*")) {
Set akc = updatesAsInvalidates ? getAllKeyClientsInv() : getAllKeyClients();
if (akc.add(clientID)) {
keysRegistered.add(interest);
}
} else {
Map<Object, Map<Object, Pattern>> pats =
updatesAsInvalidates ? getPatternsOfInterestInv() : getPatternsOfInterest();
registerPatternInMap(interest, keysRegistered, clientID, pats);
}
break;
case InterestType.FILTER_CLASS: {
opType = operationType.REGISTER_FILTER;
Map<Object, Map> filts =
updatesAsInvalidates ? getFiltersOfInterestInv() : getFiltersOfInterest();
registerFilterClassInMap(interest, clientID, filts);
break;
}
default:
throw new InternalGemFireError(
"Unknown interest type");
} // switch
if (this.isLocalProfile && opType != null) {
sendProfileOperation(clientID, opType, interest, updatesAsInvalidates);
}
} // synchronized
return keysRegistered;
}
private void registerFilterClassInMap(Object interest, Long clientID, Map<Object, Map> filts) {
// get instance of the filter
Class filterClass;
InterestFilter filter;
try {
filterClass = ClassLoadUtil.classFromName((String) interest);
filter = (InterestFilter) filterClass.newInstance();
} catch (ClassNotFoundException cnfe) {
throw new RuntimeException(String.format("Class %s not found in classpath.",
interest), cnfe);
} catch (Exception e) {
throw new RuntimeException(String.format("Class %s could not be instantiated.",
interest), e);
}
Map interestMap = filts.get(clientID);
if (interestMap == null) {
interestMap = new CopyOnWriteHashMap();
filts.put(clientID, interestMap);
}
interestMap.put(interest, filter);
}
private void registerPatternInMap(Object interest, Set keysRegistered, Long clientID,
Map<Object, Map<Object, Pattern>> pats) {
Pattern pattern = Pattern.compile((String) interest);
Map<Object, Pattern> interestMap = pats.get(clientID);
if (interestMap == null) {
interestMap = new CopyOnWriteHashMap<Object, Pattern>();
pats.put(clientID, interestMap);
}
Pattern oldPattern = interestMap.put(interest, pattern);
if (oldPattern == null) {
// If the pattern didn't exist, add it to the set of keys to pass to any listeners.
keysRegistered.add(interest);
}
}
private void registerKeyInMap(Object interest, Set keysRegistered, Long clientID,
Map<Object, Set> koi) {
Set interestList = koi.get(clientID);
if (interestList == null) {
interestList = new CopyOnWriteHashSet();
koi.put(clientID, interestList);
}
interestList.add(interest);
keysRegistered.add(interest);
}
/**
* Unregisters a client's interest
*
* @param inputClientID The identity of the client that is losing interest
* @param interest The key in which to unregister interest
* @param interestType the type of uninterest
* @return the keys unregistered, which may be null
*/
public Set unregisterClientInterest(Object inputClientID, Object interest, int interestType) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long) inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs; // read
clientID = cids.get(inputClientID);
if (clientID == null) {
if (logger.isDebugEnabled()) {
logger.debug(
"region profile unable to find '{}' for unregistration. Probably means there is no durable queue.",
inputClientID);
}
return null;
}
}
Set keysUnregistered = new HashSet();
operationType opType = null;
synchronized (this.interestListLock) {
switch (interestType) {
case InterestType.KEY: {
opType = operationType.UNREGISTER_KEY;
unregisterClientKeys(inputClientID, interest, clientID, keysUnregistered);
break;
}
case InterestType.REGULAR_EXPRESSION: {
opType = operationType.UNREGISTER_PATTERN;
unregisterClientPattern(interest, clientID, keysUnregistered);
break;
}
case InterestType.FILTER_CLASS: {
opType = operationType.UNREGISTER_FILTER;
unregisterClientFilterClass(interest, clientID);
break;
}
default:
throw new InternalGemFireError(
"bad interest type");
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, opType, interest, false);
}
} // synchronized
return keysUnregistered;
}
private void unregisterClientFilterClass(Object interest, Long clientID) {
if (interest == UnregisterAllInterest.singleton()) {
if (getFiltersOfInterest().get(clientID) != null) {
getFiltersOfInterest().remove(clientID);
}
if (getFiltersOfInterestInv().get(clientID) != null) {
getFiltersOfInterestInv().remove(clientID);
}
return;
}
Map interestMap = getFiltersOfInterest().get(clientID);
if (interestMap != null) {
interestMap.remove(interest);
if (interestMap.isEmpty()) {
getFiltersOfInterest().remove(clientID);
}
}
interestMap = getFiltersOfInterestInv().get(clientID);
if (interestMap != null) {
interestMap.remove(interest);
if (interestMap.isEmpty()) {
this.getFiltersOfInterestInv().remove(clientID);
}
}
}
private void unregisterClientPattern(Object interest, Long clientID, Set keysUnregistered) {
if (interest instanceof String && ((String) interest).equals(".*")) { // ALL_KEYS
unregisterAllKeys(interest, clientID, keysUnregistered);
return;
}
if (interest == UnregisterAllInterest.singleton()) {
unregisterClientIDFromMap(clientID, getPatternsOfInterest(), keysUnregistered);
unregisterClientIDFromMap(clientID, getPatternsOfInterestInv(), keysUnregistered);
if (getAllKeyClients().remove(clientID)) {
keysUnregistered.add(".*");
}
if (getAllKeyClientsInv().remove(clientID)) {
keysUnregistered.add(".*");
}
} else {
unregisterPatternFromMap(getPatternsOfInterest(), interest, clientID, keysUnregistered);
unregisterPatternFromMap(getPatternsOfInterestInv(), interest, clientID, keysUnregistered);
}
}
private void unregisterPatternFromMap(Map<Object, Map<Object, Pattern>> map, Object interest,
Long clientID, Set keysUnregistered) {
Map interestMap = map.get(clientID);
if (interestMap != null) {
Object obj = interestMap.remove(interest);
if (obj != null) {
keysUnregistered.add(interest);
}
if (interestMap.isEmpty()) {
map.remove(clientID);
}
}
}
private void unregisterClientIDFromMap(Long clientID, Map interestMap, Set keysUnregistered) {
if (interestMap.get(clientID) != null) {
Map removed = (Map) interestMap.remove(clientID);
if (removed != null) {
keysUnregistered.addAll(removed.keySet());
}
}
}
private void unregisterAllKeys(Object interest, Long clientID, Set keysUnregistered) {
if (getAllKeyClients().remove(clientID)) {
keysUnregistered.add(interest);
}
if (getAllKeyClientsInv().remove(clientID)) {
keysUnregistered.add(interest);
}
}
private void unregisterClientKeys(Object inputClientID, Object interest, Long clientID,
Set keysUnregistered) {
if (interest == UnregisterAllInterest.singleton()) {
clearInterestFor(inputClientID);
return;
}
unregisterKeyFromMap(getKeysOfInterest(), interest, clientID, keysUnregistered);
unregisterKeyFromMap(getKeysOfInterestInv(), interest, clientID, keysUnregistered);
return;
}
private void unregisterKeyFromMap(Map<Object, Set> map, Object interest, Long clientID,
Set keysUnregistered) {
Set interestList = map.get(clientID);
if (interestList != null) {
boolean removed = interestList.remove(interest);
if (removed) {
keysUnregistered.add(interest);
}
if (interestList.isEmpty()) {
map.remove(clientID);
}
}
}
/**
* Registers interest in a set of keys for a client
*
* @param keys The list of keys in which to register interest
* @param updatesAsInvalidates whether to send invalidations instead of updates
* @return the registered keys
*/
public Set registerClientInterestList(Object inputClientID, List keys,
boolean updatesAsInvalidates) {
Long clientID = getClientIDForMaps(inputClientID);
Set keysRegistered = new HashSet(keys);
synchronized (interestListLock) {
Map<Object, Set> koi = updatesAsInvalidates ? getKeysOfInterestInv() : getKeysOfInterest();
CopyOnWriteHashSet interestList = (CopyOnWriteHashSet) koi.get(clientID);
if (interestList == null) {
interestList = new CopyOnWriteHashSet();
koi.put(clientID, interestList);
} else {
// Get the list of keys that will be registered new, not already registered.
keysRegistered.removeAll(interestList.getSnapshot());
}
interestList.addAll(keys);
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.REGISTER_KEYS, keys, updatesAsInvalidates);
}
} // synchronized
return keysRegistered;
}
/**
* Unregisters interest in given keys for the given client
*
* @param inputClientID The fully-qualified name of the region in which to unregister interest
* @param keys The list of keys in which to unregister interest
* @return the unregistered keys
*/
public Set unregisterClientInterestList(Object inputClientID, List keys) {
Long clientID = getClientIDForMaps(inputClientID);
Set keysUnregistered = new HashSet(keys);
Set keysNotUnregistered = new HashSet(keys);
synchronized (interestListLock) {
CopyOnWriteHashSet interestList = (CopyOnWriteHashSet) getKeysOfInterest().get(clientID);
if (interestList != null) {
// Get the list of keys that are not registered but in unregister set.
keysNotUnregistered.removeAll(interestList.getSnapshot());
interestList.removeAll(keys);
if (interestList.isEmpty()) {
getKeysOfInterest().remove(clientID);
}
}
interestList = (CopyOnWriteHashSet) getKeysOfInterestInv().get(clientID);
if (interestList != null) {
keysNotUnregistered.removeAll(interestList.getSnapshot());
interestList.removeAll(keys);
if (interestList.isEmpty()) {
getKeysOfInterestInv().remove(clientID);
}
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.UNREGISTER_KEYS, keys, false);
}
} // synchronized
// Get the keys that are not unregistered.
keysUnregistered.removeAll(keysNotUnregistered);
return keysUnregistered;
}
public Set getKeysOfInterestFor(Object inputClientID) {
Long clientID = getClientIDForMaps(inputClientID);
Set keys1 = this.getKeysOfInterest().get(clientID);
Set keys2 = this.getKeysOfInterestInv().get(clientID);
if (keys1 == null) {
if (keys2 == null) {
return null;
}
return Collections.unmodifiableSet(keys2);
} else if (keys2 == null) {
return Collections.unmodifiableSet(keys1);
} else {
Set result = new HashSet(keys1);
result.addAll(keys2);
return Collections.unmodifiableSet(result);
}
}
public Map<String, Pattern> getPatternsOfInterestFor(Object inputClientID) {
Long clientID = getClientIDForMaps(inputClientID);
Map patterns1 = this.getPatternsOfInterest().get(clientID);
Map patterns2 = this.getPatternsOfInterestInv().get(clientID);
if (patterns1 == null) {
if (patterns2 == null) {
return null;
}
return Collections.unmodifiableMap(patterns2);
} else if (patterns2 == null) {
return Collections.unmodifiableMap(patterns1);
} else {
Map<String, Pattern> result = new HashMap(patterns1);
result.putAll(patterns2);
return Collections.unmodifiableMap(result);
}
}
public boolean hasKeysOfInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID = getClientIDForMaps(inputClientID);
if (wantInvalidations) {
return this.getKeysOfInterestInv().containsKey(clientID);
}
return this.getKeysOfInterest().containsKey(clientID);
}
public boolean hasAllKeysInterestFor(Object inputClientID) {
Long clientID = getClientIDForMaps(inputClientID);
return hasAllKeysInterestFor(clientID, false) || hasAllKeysInterestFor(clientID, true);
}
public boolean hasAllKeysInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID = getClientIDForMaps(inputClientID);
if (wantInvalidations) {
return getAllKeyClientsInv().contains(clientID);
}
return getAllKeyClients().contains(clientID);
}
public boolean hasRegexInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID = getClientIDForMaps(inputClientID);
if (wantInvalidations) {
return (this.getPatternsOfInterestInv().containsKey(clientID));
}
return (this.getPatternsOfInterest().containsKey(clientID));
}
public boolean hasFilterInterestFor(Object inputClientID, boolean wantInvalidations) {
Long clientID = getClientIDForMaps(inputClientID);
if (wantInvalidations) {
return this.getFiltersOfInterestInv().containsKey(clientID);
}
return this.getFiltersOfInterest().containsKey(clientID);
}
/** determines whether there is any remaining interest for the given identifier */
public boolean hasInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long) inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs; // read
clientID = cids.get(inputClientID);
if (clientID == null) {
return false;
}
}
return this.hasAllKeysInterestFor(clientID, true) || this.hasKeysOfInterestFor(clientID, true)
|| this.hasRegexInterestFor(clientID, true) || this.hasFilterInterestFor(clientID, true)
|| this.hasKeysOfInterestFor(clientID, false) || this.hasRegexInterestFor(clientID, false)
|| this.hasAllKeysInterestFor(clientID, false)
|| this.hasFilterInterestFor(clientID, false);
}
/*
* Returns whether this interest list has any keys, patterns or filters of interest. It answers
* the question: Are any clients being notified because of this interest list? @return whether
* this interest list has any keys, patterns or filters of interest
*/
public boolean hasInterest() {
return (!this.getAllKeyClients().isEmpty()) || (!this.getAllKeyClientsInv().isEmpty())
|| (!this.getKeysOfInterest().isEmpty()) || (!this.getPatternsOfInterest().isEmpty())
|| (!this.getFiltersOfInterest().isEmpty()) || (!this.getKeysOfInterestInv().isEmpty())
|| (!this.getPatternsOfInterestInv().isEmpty())
|| (!this.getFiltersOfInterestInv().isEmpty());
}
/** removes all interest for the given identifier */
public void clearInterestFor(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long) inputClientID;
} else {
Map<Object, Long> cids = clientMap.realIDs;
clientID = cids.get(inputClientID);
if (clientID == null) {
// haven't seen this client yet
return;
}
}
synchronized (interestListLock) {
{
Set akc = this.getAllKeyClients();
if (akc.contains(clientID)) {
akc.remove(clientID);
}
}
{
Set akci = this.getAllKeyClientsInv();
if (akci.contains(clientID)) {
akci.remove(clientID);
}
}
{
Map<Object, Set> keys = this.getKeysOfInterest();
if (keys.containsKey(clientID)) {
keys.remove(clientID);
}
}
{
Map<Object, Set> keys = this.getKeysOfInterestInv();
if (keys.containsKey(clientID)) {
keys.remove(clientID);
}
}
{
Map<Object, Map<Object, Pattern>> pats = this.getPatternsOfInterest();
if (pats.containsKey(clientID)) {
pats.remove(clientID);
}
}
{
Map<Object, Map<Object, Pattern>> pats = this.getPatternsOfInterestInv();
if (pats.containsKey(clientID)) {
pats.remove(clientID);
}
}
{
Map<Object, Map> filters = this.getFiltersOfInterest();
if (filters.containsKey(clientID)) {
filters.remove(clientID);
}
}
{
Map<Object, Map> filters = this.getFiltersOfInterestInv();
if (filters.containsKey(clientID)) {
filters.remove(clientID);
}
}
if (clientMap != null) {
clientMap.removeIDMapping(clientID);
}
if (this.region != null && this.isLocalProfile) {
sendProfileOperation(clientID, operationType.CLEAR, null, false);
}
}
}
/**
* Obtains the number of CQs registered on the region. Assumption: CQs are not duplicated among
* clients.
*
* @return int currently registered CQs of the region
*/
public int getCqCount() {
return this.cqCount.get();
}
public void incCqCount() {
this.cqCount.getAndIncrement();
}
public void decCqCount() {
this.cqCount.decrementAndGet();
}
/**
* Returns the CQs registered on this region.
*/
public Map getCqMap() {
return this.cqs;
}
/**
* does this profile contain any continuous queries?
*/
public boolean hasCQs() {
return this.cqCount.get() > 0;
}
public ServerCQ getCq(String cqName) {
return (ServerCQ) this.cqs.get(cqName);
}
public void registerCq(ServerCQ cq) {
ensureCqID(cq);
if (logger.isDebugEnabled()) {
logger.debug("Adding CQ {} to this members FilterProfile.", cq.getServerCqName());
}
this.cqs.put(cq.getServerCqName(), cq);
this.incCqCount();
// cq.setFilterID(cqMap.getWireID(cq.getServerCqName()));
this.sendCQProfileOperation(operationType.REGISTER_CQ, cq);
}
public void stopCq(ServerCQ cq) {
ensureCqID(cq);
if (logger.isDebugEnabled()) {
logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
}
this.sendCQProfileOperation(operationType.STOP_CQ, cq);
}
public String generateCqName(String serverCqName) {
// Set unique CQ Name with respect to the senders filterProfile.
// To avoid any conflict with the CQ names while maintained in
// the CqService.
// The CQ will be identified in the remote node using its
// serverCqName.
return (serverCqName + this.hashCode());
}
void processRegisterCq(String serverCqName, ServerCQ ServerCQ, boolean addToCqMap) {
processRegisterCq(serverCqName, ServerCQ, addToCqMap, GemFireCacheImpl.getInstance());
}
/**
* adds a new CQ to this profile during a delta operation or deserialization
*
* @param serverCqName the query objects' name
* @param ServerCQ the new query object
* @param addToCqMap whether to add the query to this.cqs
*/
void processRegisterCq(String serverCqName, ServerCQ ServerCQ, boolean addToCqMap,
GemFireCacheImpl cache) {
if (cache == null) {
logger.info("Error while initializing the CQs with FilterProfile for CQ " + serverCqName
+ ", Error : Cache has been closed.");
return;
}
ServerCQ cq = (ServerCQ) ServerCQ;
try {
CqService cqService = cache.getCqService();
cqService.start();
cq.setCqService(cqService);
CqStateImpl cqState = (CqStateImpl) cq.getState();
cq.setName(generateCqName(serverCqName));
cq.registerCq(null, null, cqState.getState());
} catch (Exception ex) {
logger.info("Error while initializing the CQs with FilterProfile for CQ {}, Error : {}",
serverCqName, ex.getMessage(), ex);
}
if (logger.isDebugEnabled()) {
logger.debug("Adding CQ to remote members FilterProfile using name: {}", serverCqName);
}
// The region's FilterProfile is accessed through CQ reference as the
// region is not set on the FilterProfile created for the peer nodes.
if (cq.getCqBaseRegion() != null) {
if (addToCqMap) {
this.cqs.put(serverCqName, cq);
}
FilterProfile pf = cq.getCqBaseRegion().getFilterProfile();
if (pf != null) {
pf.incCqCount();
}
}
}
public void processCloseCq(String serverCqName) {
ServerCQ cq = (ServerCQ) this.cqs.get(serverCqName);
if (cq != null) {
try {
cq.close(false);
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug(
"Unable to close the CQ with the filterProfile, on region {} for CQ {}, Error : {}",
this.region.getFullPath(), serverCqName, ex.getMessage(), ex);
}
}
this.cqs.remove(serverCqName);
cq.getCqBaseRegion().getFilterProfile().decCqCount();
}
}
public void processSetCqState(String serverCqName, ServerCQ ServerCQ) {
ServerCQ cq = (ServerCQ) this.cqs.get(serverCqName);
if (cq != null) {
CqStateImpl cqState = (CqStateImpl) ServerCQ.getState();
cq.setCqState(cqState.getState());
}
}
public void processStopCq(String serverCqName) {
ServerCQ cq = (ServerCQ) this.cqs.get(serverCqName);
if (cq != null) {
try {
cq.stop();
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug(
"Unable to stop the CQ with the filterProfile, on region {} for CQ {}, Error : {}",
this.region.getFullPath(), serverCqName, ex.getMessage(), ex);
}
}
}
}
public void setCqState(ServerCQ cq) {
// This could be when CQ is stopped and restarted.
ensureCqID(cq);
if (logger.isDebugEnabled()) {
logger.debug("Stopping CQ {} on this members FilterProfile.", cq.getServerCqName());
}
this.sendCQProfileOperation(operationType.SET_CQ_STATE, cq);
}
public void closeCq(ServerCQ cq) {
ensureCqID(cq);
String serverCqName = cq.getServerCqName();
this.cqs.remove(serverCqName);
if (this.cqMap != null) {
this.cqMap.removeIDMapping(cq.getFilterID());
}
this.decCqCount();
this.sendCQProfileOperation(operationType.CLOSE_CQ, cq);
}
void cleanupForClient(CacheClientNotifier ccn, ClientProxyMembershipID client) {
Iterator cqIter = this.cqs.entrySet().iterator();
while (cqIter.hasNext()) {
Map.Entry cqEntry = (Map.Entry) cqIter.next();
ServerCQ cq = (ServerCQ) cqEntry.getValue();
ClientProxyMembershipID clientId = cq.getClientProxyId();
if (clientId.equals(client)) {
try {
cq.close(false);
} catch (Exception ignore) {
if (logger.isDebugEnabled()) {
logger.debug("Failed to remove CQ from the base region. CqName : {}", cq.getName());
}
}
this.closeCq(cq);
}
}
// Remove the client from the clientMap
this.clientMap.removeIDMapping(client);
}
/**
* this will be get called when we remove other server profile from region advisor.
*/
void cleanUp() {
Map tmpCq = this.cqs;
if (tmpCq.size() > 0) {
for (Object serverCqName : tmpCq.keySet()) {
processCloseCq((String) serverCqName);
}
}
}
/**
* Returns if old value is required for CQ processing or not. In order to reduce the query
* processing time CQ caches the event keys its already seen, if the key is cached than the old
* value is not required.
*/
public boolean entryRequiresOldValue(Object key) {
if (this.hasCQs()) {
if (!CqServiceProvider.MAINTAIN_KEYS) {
return true;
}
Iterator cqIter = this.cqs.values().iterator();
while (cqIter.hasNext()) {
ServerCQ cq = (ServerCQ) cqIter.next();
if (cq.isOldValueRequiredForQueryProcessing(key)) {
return true;
}
}
}
return false;
}
private void sendProfileOperation(Long clientID, operationType opType, Object interest,
boolean updatesAsInvalidates) {
if (this.region == null || !(this.region instanceof PartitionedRegion)) {
return;
}
OperationMessage msg = new OperationMessage();
msg.regionName = this.region.getFullPath();
msg.clientID = clientID;
msg.opType = opType;
msg.interest = interest;
msg.updatesAsInvalidates = updatesAsInvalidates;
sendFilterProfileOperation(msg);
}
private void sendFilterProfileOperation(OperationMessage msg) {
Set recipients =
((DistributionAdvisee) this.region).getDistributionAdvisor().adviseProfileUpdate();
msg.setRecipients(recipients);
ReplyProcessor21 rp = new ReplyProcessor21(this.region.getDistributionManager(), recipients);
msg.processorId = rp.getProcessorId();
this.region.getDistributionManager().putOutgoing(msg);
try {
rp.waitForReplies();
} catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
private void sendCQProfileOperation(operationType opType, ServerCQ cq) {
// we only need to distribute for PRs. Other regions do local filter processing
if (!(this.region instanceof PartitionedRegion)) {
// note that we do not need to update requiresOldValueInEvents because
// that flag is only used during region initialization. Otherwise we
// would need to
return;
}
OperationMessage msg = new OperationMessage();
msg.regionName = this.region.getFullPath();
msg.opType = opType;
msg.cq = cq;
try {
sendFilterProfileOperation(msg);
} catch (Exception ex) {
if (logger.isDebugEnabled()) {
logger.debug("Error sending CQ request to peers. {}", ex.getLocalizedMessage(), ex);
}
}
}
@Immutable
static final Profile[] NO_PROFILES = new Profile[0];
private final CacheProfile localProfile = new CacheProfile(this);
private final Profile[] localProfileArray = new Profile[] {localProfile};
/** compute local routing information */
public FilterInfo getLocalFilterRouting(CacheEvent event) {
FilterRoutingInfo fri = getFilterRoutingInfoPart2(null, event);
if (fri != null) {
return fri.getLocalFilterInfo();
} else {
return null;
}
}
/**
* @return the local CacheProfile for this FilterProfile's Region
*/
public CacheProfile getLocalProfile() {
return this.localProfile;
}
/**
* Compute the full routing information for the given set of peers. This will not include local
* routing information from interest processing. That is done by getFilterRoutingInfoPart2
*/
public FilterRoutingInfo getFilterRoutingInfoPart1(CacheEvent event, Profile[] peerProfiles,
Set cacheOpRecipients) {
// early out if there are no cache servers in the system
boolean anyServers = false;
for (int i = 0; i < peerProfiles.length; i++) {
if (((CacheProfile) peerProfiles[i]).hasCacheServer) {
anyServers = true;
break;
}
}
if (!anyServers && !this.localProfile.hasCacheServer) {
return null;
}
FilterRoutingInfo frInfo = null;
CqService cqService = getCqService(event.getRegion());
if (cqService.isRunning()) {
frInfo = new FilterRoutingInfo();
// bug #50809 - local routing for transactional ops must be done here
// because the event isn't available later and we lose the old value for the entry
final boolean processLocalProfile =
event.getOperation().isEntry() && ((EntryEvent) event).getTransactionId() != null;
fillInCQRoutingInfo(event, processLocalProfile, peerProfiles, frInfo);
}
// Process InterestList.
// return fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
frInfo = fillInInterestRoutingInfo(event, peerProfiles, frInfo, cacheOpRecipients);
if (frInfo == null || !frInfo.hasMemberWithFilterInfo()) {
return null;
} else {
return frInfo;
}
}
/**
* get local routing information
*
* @param part1Info routing information for peers, if any
* @param event the event to process
* @return routing information for clients connected to this server
*/
public FilterRoutingInfo getFilterRoutingInfoPart2(FilterRoutingInfo part1Info,
CacheEvent event) {
FilterRoutingInfo result = part1Info;
if (localProfile.hasCacheServer) {
// bug #45520 - CQ events arriving out of order causes result set
// inconsistency, so don't compute routings for events in conflict
boolean isInConflict =
event.getOperation().isEntry() && ((EntryEventImpl) event).isConcurrencyConflict();
CqService cqService = getCqService(event.getRegion());
if (!isInConflict && cqService.isRunning()
&& this.region != null /*
* && !( this.region.isUsedForPartitionedRegionBucket() || //
* partitioned region CQ this.region instanceof PartitionedRegion)
*/) { // processing is done in part 1
if (result == null) {
result = new FilterRoutingInfo();
}
if (logger.isDebugEnabled()) {
logger.debug("getting local cq matches for {}", event);
}
fillInCQRoutingInfo(event, true, NO_PROFILES, result);
}
result = fillInInterestRoutingInfo(event, localProfileArray, result, Collections.emptySet());
}
return result;
}
/**
* get continuous query routing information
*
* @param event the event to process
* @param peerProfiles the profiles getting this event
* @param frInfo the routing table to update
*/
private void fillInCQRoutingInfo(CacheEvent event, boolean processLocalProfile,
Profile[] peerProfiles, FilterRoutingInfo frInfo) {
CqService cqService = getCqService(event.getRegion());
if (cqService != null) {
try {
Profile local = processLocalProfile ? this.localProfile : null;
cqService.processEvents(event, local, peerProfiles, frInfo);
} catch (VirtualMachineError err) {
SystemFailure.initiateFailure(err);
// If this ever returns, re-throw the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable t) {
SystemFailure.checkFailure();
logger.error("Exception occurred while processing CQs", t);
}
}
}
private CqService getCqService(Region region) {
return ((InternalCache) region.getRegionService()).getCqService();
}
/**
* computes FilterRoutingInfo objects for each of the given events
*/
public void getLocalFilterRoutingForPutAllOp(DistributedPutAllOperation dpao,
DistributedPutAllOperation.PutAllEntryData[] putAllData) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (this.region != null && this.localProfile.hasCacheServer) {
Set clientsInv = null;
Set clients = null;
int size = putAllData.length;
CqService cqService = getCqService(dpao.getRegion());
boolean doCQs = cqService.isRunning();
for (int idx = 0; idx < size; idx++) {
PutAllEntryData pEntry = putAllData[idx];
if (pEntry != null) {
@Unretained
final EntryEventImpl ev = dpao.getEventForPosition(idx);
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
fi = fri.getLocalFilterInfo();
}
if (isDebugEnabled) {
logger.debug("Finding locally interested clients for {}", ev);
}
if (doCQs) {
if (fri == null) {
fri = new FilterRoutingInfo();
}
fillInCQRoutingInfo(ev, true, NO_PROFILES, fri);
fi = fri.getLocalFilterInfo();
}
if (this.allKeyClientsInv != null || this.keysOfInterestInv != null
|| this.patternsOfInterestInv != null || this.filtersOfInterestInv != null) {
clientsInv = this.getInterestedClients(ev, this.allKeyClientsInv,
this.keysOfInterestInv, this.patternsOfInterestInv, this.filtersOfInterestInv);
}
if (this.allKeyClients != null || this.keysOfInterest != null
|| this.patternsOfInterest != null || this.filtersOfInterest != null) {
clients = this.getInterestedClients(ev, this.allKeyClients, this.keysOfInterest,
this.patternsOfInterest, this.filtersOfInterest);
}
if (clients != null || clientsInv != null) {
if (fi == null) {
fi = new FilterInfo();
// no need to create or update a FilterRoutingInfo at this time
}
fi.setInterestedClients(clients);
fi.setInterestedClientsInv(clientsInv);
}
ev.setLocalFilterInfo(fi);
}
}
}
}
/**
* computes FilterRoutingInfo objects for each of the given events
*/
public void getLocalFilterRoutingForRemoveAllOp(DistributedRemoveAllOperation op,
RemoveAllEntryData[] removeAllData) {
if (this.region != null && this.localProfile.hasCacheServer) {
Set clientsInv = null;
Set clients = null;
int size = removeAllData.length;
CqService cqService = getCqService(op.getRegion());
boolean doCQs = cqService.isRunning();
for (int idx = 0; idx < size; idx++) {
RemoveAllEntryData pEntry = removeAllData[idx];
if (pEntry != null) {
@Unretained
final EntryEventImpl ev = op.getEventForPosition(idx);
FilterRoutingInfo fri = pEntry.filterRouting;
FilterInfo fi = null;
if (fri != null) {
fi = fri.getLocalFilterInfo();
}
if (logger.isDebugEnabled()) {
logger.debug("Finding locally interested clients for {}", ev);
}
if (doCQs) {
if (fri == null) {
fri = new FilterRoutingInfo();
}
fillInCQRoutingInfo(ev, true, NO_PROFILES, fri);
fi = fri.getLocalFilterInfo();
}
if (this.allKeyClientsInv != null || this.keysOfInterestInv != null
|| this.patternsOfInterestInv != null || this.filtersOfInterestInv != null) {
clientsInv = this.getInterestedClients(ev, this.allKeyClientsInv,
this.keysOfInterestInv, this.patternsOfInterestInv, this.filtersOfInterestInv);
}
if (this.allKeyClients != null || this.keysOfInterest != null
|| this.patternsOfInterest != null || this.filtersOfInterest != null) {
clients = this.getInterestedClients(ev, this.allKeyClients, this.keysOfInterest,
this.patternsOfInterest, this.filtersOfInterest);
}
if (clients != null || clientsInv != null) {
if (fi == null) {
fi = new FilterInfo();
// no need to create or update a FilterRoutingInfo at this time
}
fi.setInterestedClients(clients);
fi.setInterestedClientsInv(clientsInv);
}
// if (this.logger.fineEnabled()) {
// this.region.getLogWriterI18n().fine("setting event routing to " + fi);
// }
ev.setLocalFilterInfo(fi);
}
}
}
}
/**
* Fills in the routing information for clients that have registered interest in the given event.
* The routing information is stored in the given FilterRoutingInfo object for use in message
* delivery.
*
* @param event the event being applied to the cache
* @param profiles the profiles of members having the affected region
* @param filterRoutingInfo the routing object that is modified by this method (may be null)
* @param cacheOpRecipients members that will receive a CacheDistributionMessage for the event
* @return the resulting FilterRoutingInfo
*/
public FilterRoutingInfo fillInInterestRoutingInfo(CacheEvent event, Profile[] profiles,
FilterRoutingInfo filterRoutingInfo, Set cacheOpRecipients) {
Set clientsInv = Collections.emptySet();
Set clients = Collections.emptySet();
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "finding interested clients for {}", event);
}
FilterRoutingInfo frInfo = filterRoutingInfo;
for (int i = 0; i < profiles.length; i++) {
CacheProfile cf = (CacheProfile) profiles[i];
if (!cf.hasCacheServer) {
continue;
}
FilterProfile pf = cf.filterProfile;
if (pf == null) {
continue;
}
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Processing {}", pf);
}
if (!pf.hasInterest()) {
// This block sends an empty routing table to a member that's going to
// get a CacheDistributionMessage so that if, in flight, there is an
// interest registration change the version held in the routing table
// can be used to detect the change and the receiver can recompute
// the routing.
if (!pf.isLocalProfile() && cacheOpRecipients.contains(cf.getDistributedMember())) {
if (frInfo == null)
frInfo = new FilterRoutingInfo();
frInfo.addInterestedClients(cf.getDistributedMember(), Collections.emptySet(),
Collections.emptySet(), false);
}
continue;
}
if (event.getOperation() == null) {
continue;
}
if (event.getOperation().isEntry()) {
EntryEvent entryEvent = (EntryEvent) event;
if (pf.allKeyClientsInv != null || pf.keysOfInterestInv != null
|| pf.patternsOfInterestInv != null || pf.filtersOfInterestInv != null) {
clientsInv = pf.getInterestedClients(entryEvent, pf.allKeyClientsInv,
pf.keysOfInterestInv, pf.patternsOfInterestInv, pf.filtersOfInterestInv);
}
if (pf.allKeyClients != null || pf.keysOfInterest != null || pf.patternsOfInterest != null
|| pf.filtersOfInterest != null) {
clients = pf.getInterestedClients(entryEvent, pf.allKeyClients, pf.keysOfInterest,
pf.patternsOfInterest, pf.filtersOfInterest);
}
} else {
if (event.getOperation().isRegionDestroy() || event.getOperation().isClear()) {
clientsInv = pf.getAllClientsWithInterestInv();
clients = pf.getAllClientsWithInterest();
} else {
return frInfo;
}
}
if (pf.isLocalProfile) {
if (logger.isDebugEnabled()) {
logger.debug("Setting local interested clients={} and clientsInv={}", clients,
clientsInv);
}
if (frInfo == null)
frInfo = new FilterRoutingInfo();
frInfo.setLocalInterestedClients(clients, clientsInv);
} else {
if (cacheOpRecipients.contains(cf.getDistributedMember()) || // always send a routing with
// CacheOperationMessages
(clients != null && !clients.isEmpty())
|| (clientsInv != null && !clientsInv.isEmpty())) {
if (logger.isDebugEnabled()) {
logger.debug("Adding interested clients={} and clientsIn={} to {}", clients, clientsInv,
filterRoutingInfo);
}
if (frInfo == null)
frInfo = new FilterRoutingInfo();
frInfo.addInterestedClients(cf.getDistributedMember(), clients, clientsInv,
this.clientMap.hasLongID);
}
}
}
return frInfo;
}
/**
* get the clients interested in the given event that are attached to this server.
*
* @param event the entry event being applied to the cache
* @param akc allKeyClients collection
* @param koi keysOfInterest collection
* @param pats patternsOfInterest collection
* @param foi filtersOfInterest collection
* @return a set of the clients interested in the event
*/
private Set getInterestedClients(EntryEvent event, Set akc, Map<Object, Set> koi,
Map<Object, Map<Object, Pattern>> pats, Map<Object, Map> foi) {
Set result = null;
if (akc != null) {
result = new HashSet(akc);
if (logger.isDebugEnabled()) {
logger.debug("these clients matched for all-keys: {}", akc);
}
}
if (koi != null) {
for (Iterator it = koi.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Set keys = (Set) entry.getValue();
if (keys.contains(event.getKey())) {
Object clientID = entry.getKey();
if (result == null)
result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for key list (size {})", clientID,
koi.get(clientID).size());
}
}
}
}
if (pats != null && (event.getKey() instanceof String)) {
for (Iterator it = pats.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
String stringKey = (String) event.getKey();
Map<Object, Pattern> interestList = (Map<Object, Pattern>) entry.getValue();
for (Pattern keyPattern : interestList.values()) {
if (keyPattern.matcher(stringKey).matches()) {
Object clientID = entry.getKey();
if (result == null)
result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for pattern ({})", clientID, pats.get(clientID));
}
break;
}
}
}
}
if (foi != null && foi.size() > 0) {
Object value;
boolean serialized;
{
SerializedCacheValue<?> serValue = event.getSerializedNewValue();
serialized = (serValue != null);
if (!serialized) {
value = event.getNewValue();
} else {
value = serValue.getSerializedValue();
}
}
InterestEvent iev = new InterestEvent(event.getKey(), value, !serialized);
Operation op = event.getOperation();
for (Iterator it = foi.entrySet().iterator(); it.hasNext();) {
Map.Entry entry = (Map.Entry) it.next();
Map<String, InterestFilter> interestList = (Map<String, InterestFilter>) entry.getValue();
for (InterestFilter filter : interestList.values()) {
if ((op.isCreate() && filter.notifyOnCreate(iev))
|| (op.isUpdate() && filter.notifyOnUpdate(iev))
|| (op.isDestroy() && filter.notifyOnDestroy(iev))
|| (op.isInvalidate() && filter.notifyOnInvalidate(iev))) {
Object clientID = entry.getKey();
if (result == null)
result = new HashSet();
result.add(clientID);
if (logger.isDebugEnabled()) {
logger.debug("client {} matched for filter ({})", clientID,
getFiltersOfInterest().get(clientID));
}
break;
}
}
}
}
return result;
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
InternalDistributedMember id = new InternalDistributedMember();
InternalDataSerializer.invokeFromData(id, in);
this.memberID = id;
this.allKeyClients.addAll(InternalDataSerializer.readSetOfLongs(in));
this.keysOfInterest.putAll(DataSerializer.readHashMap(in));
this.patternsOfInterest.putAll(DataSerializer.readHashMap(in));
this.filtersOfInterest.putAll(DataSerializer.readHashMap(in));
this.allKeyClientsInv.addAll(InternalDataSerializer.readSetOfLongs(in));
this.keysOfInterestInv.putAll(DataSerializer.readHashMap(in));
this.patternsOfInterestInv.putAll(DataSerializer.readHashMap(in));
this.filtersOfInterestInv.putAll(DataSerializer.readHashMap(in));
// Read CQ Info.
int numCQs = InternalDataSerializer.readArrayLength(in);
if (numCQs > 0) {
final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(ANY_INIT);
try {
for (int i = 0; i < numCQs; i++) {
String serverCqName = DataSerializer.readString(in);
ServerCQ cq = CqServiceProvider.readCq(in);
processRegisterCq(serverCqName, cq, false);
this.cqs.put(serverCqName, cq);
}
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
}
}
@Override
public int getDSFID() {
return FILTER_PROFILE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
InternalDataSerializer.invokeToData(memberID, out);
InternalDataSerializer.writeSetOfLongs(this.allKeyClients.getSnapshot(),
this.clientMap.hasLongID, out);
DataSerializer.writeHashMap(this.keysOfInterest.getSnapshot(), out);
DataSerializer.writeHashMap(this.patternsOfInterest.getSnapshot(), out);
DataSerializer.writeHashMap(this.filtersOfInterest.getSnapshot(), out);
InternalDataSerializer.writeSetOfLongs(this.allKeyClientsInv.getSnapshot(),
this.clientMap.hasLongID, out);
DataSerializer.writeHashMap(this.keysOfInterestInv.getSnapshot(), out);
DataSerializer.writeHashMap(this.patternsOfInterestInv.getSnapshot(), out);
DataSerializer.writeHashMap(this.filtersOfInterestInv.getSnapshot(), out);
// Write CQ info.
Map<String, ServerCQ> theCQs = this.cqs.getSnapshot();
int size = theCQs.size();
InternalDataSerializer.writeArrayLength(size, out);
for (Iterator<Map.Entry<String, ServerCQ>> it = theCQs.entrySet().iterator(); it.hasNext();) {
Map.Entry<String, ServerCQ> entry = it.next();
String name = entry.getKey();
ServerCQ cq = entry.getValue();
DataSerializer.writeString(name, out);
InternalDataSerializer.invokeToData(cq, out);
}
}
/**
* @return the keysOfInterest
*/
private Map<Object, Set> getKeysOfInterest() {
return this.keysOfInterest;
}
/**
* @return the keysOfInterestInv
*/
private Map<Object, Set> getKeysOfInterestInv() {
return this.keysOfInterestInv;
}
/**
* @return the patternsOfInterest
*/
private Map<Object, Map<Object, Pattern>> getPatternsOfInterest() {
return this.patternsOfInterest;
}
/**
* @return the patternsOfInterestInv
*/
private Map<Object, Map<Object, Pattern>> getPatternsOfInterestInv() {
return this.patternsOfInterestInv;
}
/**
* @return the filtersOfInterestInv
*/
Map<Object, Map> getFiltersOfInterestInv() {
return this.filtersOfInterestInv;
}
/**
* @return the filtersOfInterest
*/
private Map<Object, Map> getFiltersOfInterest() {
return this.filtersOfInterest;
}
private Set<Object> getAllKeyClients() {
if (testHook != null) {
testHook.await();
}
return (Set) this.allKeyClients;
}
public int getAllKeyClientsSize() {
return this.getAllKeyClients().size();
}
private Set<Long> getAllKeyClientsInv() {
return this.allKeyClientsInv;
}
public int getAllKeyClientsInvSize() {
return this.getAllKeyClientsInv().size();
}
/**
* When clients are registered they are assigned a Long identifier. This method maps between the
* real client ID and its Long identifier.
*/
private Long getClientIDForMaps(Object inputClientID) {
Long clientID;
if (inputClientID instanceof Long) {
clientID = (Long) inputClientID;
} else {
clientID = clientMap.getWireID(inputClientID);
}
return clientID;
}
@Override
public String toString() {
final boolean isDebugEnabled = logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE);
return "FilterProfile(id=" + (this.isLocalProfile ? "local" : this.memberID) + "; numCQs: "
+ ((this.cqCount == null) ? 0 : this.cqCount.get())
+ (isDebugEnabled ? ("; " + getClientMappingString()) : "")
+ (isDebugEnabled ? ("; " + getCqMappingString()) : "") + ")";
}
/** for debugging we could sometimes use a dump of the Long->clientID table */
private String getClientMappingString() {
if (clientMap == null) {
return "";
}
Map wids = clientMap.wireIDs;
if (wids.size() == 0) {
return "clients[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
StringBuilder result = new StringBuilder(sorted.size() * 70);
result.append("clients[");
Iterator<Long> it = sorted.iterator();
for (int i = 1; it.hasNext(); i++) {
Long wireID = it.next();
result.append(wireID).append("={").append(wids.get(wireID)).append('}');
if (it.hasNext()) {
result.append(", ");
}
}
result.append("]");
return result.toString();
}
/** for debugging we could sometimes use a dump of the Long->cq name table */
private String getCqMappingString() {
if (cqMap == null) {
return "";
}
Map wids = cqMap.wireIDs;
if (wids.size() == 0) {
return "cqs[]";
}
Set<Long> sorted = new TreeSet(wids.keySet());
StringBuilder result = new StringBuilder(sorted.size() * 70);
result.append("cqs[");
Iterator<Long> it = sorted.iterator();
for (int i = 1; it.hasNext(); i++) {
Long wireID = it.next();
result.append(wireID).append("={").append(wids.get(wireID)).append('}');
if (it.hasNext()) {
result.append(", ");
}
}
result.append("]");
return result.toString();
}
/**
* given a collection of on-wire identifiers, this returns a set of the client/server identifiers
* for each client or durable queue
*
* @param integerIDs the integer ids of the clients/queues
* @return the translated identifiers
*/
public Set getRealClientIDs(Collection integerIDs) {
return clientMap.getRealIDs(integerIDs);
}
/**
* given a collection of on-wire identifiers, this returns a set of the CQ identifiers they
* correspond to
*
* @param integerIDs the integer ids of the clients/queues
* @return the translated identifiers
*/
public Set getRealCqIDs(Collection integerIDs) {
return cqMap.getRealIDs(integerIDs);
}
/**
* given an on-wire filter ID, find and return the corresponding cq name
*
* @param integerID the on-wire ID
* @return the translated id
*/
public String getRealCqID(Long integerID) {
return (String) cqMap.getRealID(integerID);
}
/**
* Return the set of real client proxy ids
*
* @return the set of real client proxy ids
*/
@VisibleForTesting
public Set getRealClientIds() {
return clientMap == null ? Collections.emptySet()
: Collections.unmodifiableSet(clientMap.realIDs.keySet());
}
/**
* Return the set of wire client proxy ids
*
* @return the set of wire client proxy ids
*/
@VisibleForTesting
public Set getWireClientIds() {
return clientMap == null ? Collections.emptySet()
: Collections.unmodifiableSet(clientMap.wireIDs.keySet());
}
/**
* ensure that the given query contains a filter routing ID
*/
private void ensureCqID(ServerCQ cq) {
if (cq.getFilterID() == null) {
// on-wire IDs are assigned in the VM where the CQ was registered
assert this.isLocalProfile;
cq.setFilterID(cqMap.getWireID(cq.getServerCqName()));
}
}
/**
* @return the isLocalProfile
*/
public boolean isLocalProfile() {
return isLocalProfile;
}
/**
* Returns the filter profile messages received while members cache profile exchange was in
* progress.
*
* @param member whose messages are returned.
* @return filter profile messages that are queued for the member.
*/
public List getQueuedFilterProfileMsgs(InternalDistributedMember member) {
synchronized (this.filterProfileMsgQueue) {
if (this.filterProfileMsgQueue.containsKey(member)) {
return new LinkedList(this.filterProfileMsgQueue.get(member));
}
}
return Collections.emptyList();
}
/**
* Removes the filter profile messages from the queue that are received while the members cache
* profile exchange was in progress.
*
* @param member whose messages are returned.
* @return filter profile messages that are queued for the member.
*/
public List removeQueuedFilterProfileMsgs(InternalDistributedMember member) {
synchronized (this.filterProfileMsgQueue) {
if (this.filterProfileMsgQueue.containsKey(member)) {
return new LinkedList(this.filterProfileMsgQueue.remove(member));
}
}
return Collections.emptyList();
}
/**
* Adds the message to filter profile queue.
*/
public void addToFilterProfileQueue(InternalDistributedMember member, OperationMessage message) {
if (logger.isDebugEnabled()) {
logger.debug("Adding message to filter profile queue: {} for member : {}", message, member);
}
synchronized (this.filterProfileMsgQueue) {
LinkedList msgs = this.filterProfileMsgQueue.get(member);
if (msgs == null) {
msgs = new LinkedList();
this.filterProfileMsgQueue.put(member, msgs);
}
msgs.add(message);
}
}
/**
* Process the filter profile messages.
*/
public void processQueuedFilterProfileMsgs(List msgs) {
final boolean isDebugEnabled = logger.isDebugEnabled();
if (msgs != null) {
Iterator iter = msgs.iterator();
while (iter.hasNext()) {
try {
OperationMessage msg = (OperationMessage) iter.next();
if (isDebugEnabled) {
logger.debug("Processing the queued filter profile message :{}", msg);
}
msg.processRequest(this);
} catch (Exception ex) {
logger.warn("Exception thrown while processing queued profile messages.", ex);
}
}
}
}
/**
* OperationMessage synchronously propagates a change in the profile to another member. It is a
* serial message so that there is no chance of out-of-order execution.
*/
public static class OperationMessage extends HighPriorityDistributionMessage
implements MessageWithReply {
public long profileVersion;
boolean updatesAsInvalidates;
String regionName;
operationType opType;
long clientID;
Object interest;
int processorId;
ServerCQ cq;
String serverCqName;
/*
* (non-Javadoc)
*
* @see org.apache.geode.distributed.internal.DistributionMessage#process(org.apache.geode.
* distributed.internal.DistributionManager)
*/
@Override
protected void process(ClusterDistributionManager dm) {
try {
CacheDistributionAdvisee r = findRegion(dm);
if (r == null) {
if (logger.isDebugEnabled()) {
logger.debug("Region not found, so ignoring filter profile update: {}", this);
}
return;
}
// we only need to record the delta if this is a partitioned region
if (!(r instanceof PartitionedRegion)) {
return;
}
CacheDistributionAdvisor cda = (CacheDistributionAdvisor) r.getDistributionAdvisor();
CacheDistributionAdvisor.CacheProfile cp =
(CacheDistributionAdvisor.CacheProfile) cda.getProfile(getSender());
if (cp == null) { // PR accessors do not keep filter profiles around
if (logger.isDebugEnabled()) {
logger.debug(
"No cache profile to update, adding filter profile message to queue. Message :{}",
this);
}
FilterProfile localFP = ((LocalRegion) r).getFilterProfile();
localFP.addToFilterProfileQueue(getSender(), this);
dm.getCancelCriterion().checkCancelInProgress(null);
} else {
cp.hasCacheServer = true;
FilterProfile fp = cp.filterProfile;
if (fp == null) { // PR accessors do not keep filter profiles around
if (logger.isDebugEnabled()) {
logger.debug("No filter profile to update: {}", this);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("Processing the filter profile request for : {}", this);
}
processRequest(fp);
}
}
} catch (RuntimeException e) {
logger.warn("Exception thrown while processing profile update", e);
} finally {
ReplyMessage reply = new ReplyMessage();
reply.setProcessorId(this.processorId);
reply.setRecipient(getSender());
try {
dm.putOutgoing(reply);
} catch (CancelException ignore) {
// can't send a reply, so ignore the exception
}
}
}
public void processRequest(FilterProfile fp) {
switch (opType) {
case REGISTER_KEY:
fp.registerClientInterest(clientID, this.interest, InterestType.KEY,
updatesAsInvalidates);
break;
case REGISTER_PATTERN:
fp.registerClientInterest(clientID, this.interest, InterestType.REGULAR_EXPRESSION,
updatesAsInvalidates);
break;
case REGISTER_FILTER:
fp.registerClientInterest(clientID, this.interest, InterestType.FILTER_CLASS,
updatesAsInvalidates);
break;
case REGISTER_KEYS:
fp.registerClientInterestList(clientID, (List) this.interest, updatesAsInvalidates);
break;
case UNREGISTER_KEY:
fp.unregisterClientInterest(clientID, this.interest, InterestType.KEY);
break;
case UNREGISTER_PATTERN:
fp.unregisterClientInterest(clientID, this.interest, InterestType.REGULAR_EXPRESSION);
break;
case UNREGISTER_FILTER:
fp.unregisterClientInterest(clientID, this.interest, InterestType.FILTER_CLASS);
break;
case UNREGISTER_KEYS:
fp.unregisterClientInterestList(clientID, (List) this.interest);
break;
case CLEAR:
fp.clearInterestFor(clientID);
break;
case HAS_CQ:
fp.cqCount.set(1);
break;
case REGISTER_CQ:
fp.processRegisterCq(this.serverCqName, this.cq, true);
break;
case CLOSE_CQ:
fp.processCloseCq(this.serverCqName);
break;
case STOP_CQ:
fp.processStopCq(this.serverCqName);
break;
case SET_CQ_STATE:
fp.processSetCqState(this.serverCqName, this.cq);
break;
default:
throw new IllegalArgumentException(
"Unknown filter profile operation type in operation: " + this);
}
}
private CacheDistributionAdvisee findRegion(ClusterDistributionManager dm) {
CacheDistributionAdvisee result = null;
try {
InternalCache cache = dm.getCache();
if (cache != null) {
LocalRegion lr = (LocalRegion) cache.getRegionByPathForProcessing(regionName);
if (lr instanceof CacheDistributionAdvisee) {
result = (CacheDistributionAdvisee) lr;
}
}
} catch (CancelException ignore) {
// nothing to do
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.geode.internal.serialization.DataSerializableFixedID#getDSFID()
*/
@Override
public int getDSFID() {
return FILTER_PROFILE_UPDATE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeInt(this.processorId);
out.writeUTF(this.regionName);
out.writeShort(this.opType.ordinal());
out.writeBoolean(this.updatesAsInvalidates);
out.writeLong(this.profileVersion);
if (isCqOp(this.opType)) {
// For CQ info.
// Write Server CQ Name.
out.writeUTF(((ServerCQ) this.cq).getServerCqName());
if (this.opType == operationType.REGISTER_CQ || this.opType == operationType.SET_CQ_STATE) {
InternalDataSerializer.invokeToData((ServerCQ) this.cq, out);
}
} else {
// For interest list.
out.writeLong(this.clientID);
context.getSerializer().writeObject(this.interest, out);
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
this.processorId = in.readInt();
this.regionName = in.readUTF();
this.opType = operationType.values()[in.readShort()];
this.updatesAsInvalidates = in.readBoolean();
this.profileVersion = in.readLong();
if (isCqOp(this.opType)) {
this.serverCqName = in.readUTF();
if (this.opType == operationType.REGISTER_CQ || this.opType == operationType.SET_CQ_STATE) {
this.cq = CqServiceProvider.readCq(in);
}
} else {
this.clientID = in.readLong();
this.interest = context.getDeserializer().readObject(in);
}
}
@Override
public String toString() {
return this.getShortClassName() + "(processorId=" + this.processorId + "; region="
+ this.regionName + "; operation=" + this.opType + "; clientID=" + this.clientID
+ "; profileVersion=" + this.profileVersion
+ (isCqOp(this.opType) ? ("; CqName=" + this.serverCqName) : "") + ")";
}
}
class IDMap {
long nextID = 1;
Map<Object, Long> realIDs = new ConcurrentHashMap<Object, Long>();
Map<Long, Object> wireIDs = new ConcurrentHashMap<Long, Object>();
boolean hasLongID;
synchronized boolean hasWireID(Object realId) {
return this.realIDs.containsKey(realId);
}
/** return the on-wire routing identifier for the given ID */
Long getWireID(Object realId) {
Long result = this.realIDs.get(realId);
if (result == null) {
synchronized (this) {
result = this.realIDs.get(realId);
if (result == null) {
if (nextID == Integer.MAX_VALUE) {
this.hasLongID = true;
}
result = nextID++;
this.realIDs.put(realId, result);
this.wireIDs.put(result, realId);
}
}
if (logger.isTraceEnabled(LogMarker.BRIDGE_SERVER_VERBOSE)) {
logger.trace(LogMarker.BRIDGE_SERVER_VERBOSE, "Profile for {} mapped {} to {}",
region.getFullPath(), realId, result);
}
}
return result;
}
/** return the client or durable queue id for the given on-wire identifier */
Object getRealID(Long wireID) {
return wireIDs.get(wireID);
}
/**
* given a collection of on-wire identifiers, this returns a set of the real identifiers (e.g.,
* client IDs or durable queue IDs)
*
* @param integerIDs the integer ids
* @return the translated identifiers
*/
public Set getRealIDs(Collection integerIDs) {
if (integerIDs.size() == 0) {
return Collections.emptySet();
}
Set result = new HashSet(integerIDs.size());
Map<Long, Object> wids = wireIDs;
for (Object id : integerIDs) {
Object realID = wids.get(id);
if (realID != null) {
result.add(realID);
}
}
return result;
}
/**
* remove the mapping for the given internal ID
*/
void removeIDMapping(Long mappedId) {
Object clientId = this.wireIDs.remove(mappedId);
if (clientId != null) {
this.realIDs.remove(clientId);
}
}
/**
* remove the mapping for the given proxy ID
*/
void removeIDMapping(Object clientId) {
Long mappedId = this.realIDs.remove(clientId);
if (mappedId != null) {
this.wireIDs.remove(mappedId);
}
}
}
/**
* Returns true if the client is interested in all keys.
*
* @param id client identifier.
* @return true if client is interested in all keys.
*/
public boolean isInterestedInAllKeys(Object id) {
if (!clientMap.hasWireID(id)) {
return false;
}
return this.getAllKeyClients().contains(clientMap.getWireID(id));
}
/**
* Returns true if the client is interested in all keys, for which updates are sent as
* invalidates.
*
* @param id client identifier
* @return true if client is interested in all keys.
*/
public boolean isInterestedInAllKeysInv(Object id) {
if (!clientMap.hasWireID(id)) {
return false;
}
return this.getAllKeyClientsInv().contains(clientMap.getWireID(id));
}
/**
* Returns the set of client interested keys.
*
* @param id client identifier
* @return client interested keys.
*/
public Set getKeysOfInterest(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
return this.getKeysOfInterest().get(clientMap.getWireID(id));
}
public int getKeysOfInterestSize() {
return this.getKeysOfInterest().size();
}
/**
* Returns the set of client interested keys for which updates are sent as invalidates.
*
* @param id client identifier
* @return client interested keys.
*/
public Set getKeysOfInterestInv(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
return this.getKeysOfInterestInv().get(clientMap.getWireID(id));
}
public int getKeysOfInterestInvSize() {
return this.getKeysOfInterestInv().size();
}
/**
* Returns the set of client interested patterns.
*
* @param id client identifier
* @return client interested patterns.
*/
public Set getPatternsOfInterest(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
Map patterns = this.getPatternsOfInterest().get(clientMap.getWireID(id));
if (patterns != null) {
return new HashSet(patterns.keySet());
}
return null;
}
public int getPatternsOfInterestSize() {
return this.getPatternsOfInterest().size();
}
/**
* Returns the set of client interested patterns for which updates are sent as invalidates.
*
* @param id client identifier
* @return client interested patterns.
*/
public Set getPatternsOfInterestInv(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getPatternsOfInterestInv().get(clientMap.getWireID(id));
if (interests != null) {
return new HashSet(interests.keySet());
}
return null;
}
public int getPatternsOfInterestInvSize() {
return this.getPatternsOfInterestInv().size();
}
/**
* Returns the set of client interested filters.
*
* @param id client identifier
* @return client interested filters.
*/
public Set getFiltersOfInterest(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getFiltersOfInterest().get(clientMap.getWireID(id));
if (interests != null) {
return new HashSet(interests.keySet());
}
return null;
}
/**
* Returns the set of client interested filters for which updates are sent as invalidates.
*
* @param id client identifier
* @return client interested filters.
*/
public Set getFiltersOfInterestInv(Object id) {
if (!clientMap.hasWireID(id)) {
return null;
}
Map interests = this.getFiltersOfInterestInv().get(clientMap.getWireID(id));
if (interests != null) {
return new HashSet(interests.keySet());
}
return null;
}
@MutableForTesting
public static TestHook testHook = null;
/** Test Hook */
public interface TestHook {
void await();
void release();
}
@Override
public Version[] getSerializationVersions() {
// TODO Auto-generated method stub
return null;
}
}