blob: 21ac9eb175b2c0de2e52cc4732f6e16ad192798d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.LocalRegion.InitializationLevel.ANY_INIT;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.CacheEvent;
import org.apache.geode.cache.DataPolicy;
import org.apache.geode.cache.InterestPolicy;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.Scope;
import org.apache.geode.cache.SubscriptionAttributes;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
import org.apache.geode.distributed.internal.DistributionAdvisor;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.DSCODE;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.LocalRegion.InitializationLevel;
import org.apache.geode.internal.cache.partitioned.PRLocallyDestroyedException;
import org.apache.geode.internal.cache.persistence.DiskStoreID;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.logging.log4j.LogMarker;
/**
* Adds bookkeeping info and cache-specific behavior to DistributionAdvisor. Adds bit-encoded flags
* in addition to object info.
*
*/
@SuppressWarnings("deprecation")
public class CacheDistributionAdvisor extends DistributionAdvisor {
private static final Logger logger = LogService.getLogger();
// moved ROLLOVER_* constants to DistributionAdvisor
/** bit masks */
private static final int INTEREST_MASK = 0x01;
private static final int REPLICATE_MASK = 0x02;
private static final int LOADER_MASK = 0x04;
private static final int WRITER_MASK = 0x08;
private static final int LISTENER_MASK = 0x10;
private static final int DIST_ACK_MASK = 0x20;
private static final int GLOBAL_MASK = 0x40;
private static final int IN_RECOVERY_MASK = 0x80;
private static final int PERSISTENT_MASK = 0x100;
private static final int PROXY_MASK = 0x200;
private static final int PRELOADED_MASK = 0x400;
private static final int IS_PARTITIONED_MASK = 0x800;
private static final int REGION_INITIALIZED_MASK = 0x1000;
private static final int IS_GATEWAY_ENABLED_MASK = 0x2000;
// provider is no longer a supported attribute.
// private static final int IS_GII_PROVIDER_MASK = 0x4000;
private static final int PERSISTENT_ID_MASK = 0x4000;
/** does this member require operation notification (PartitionedRegions) */
protected static final int REQUIRES_NOTIFICATION_MASK = 0x8000;
private static final int HAS_CACHE_SERVER_MASK = 0x10000;
private static final int REQUIRES_OLD_VALUE_MASK = 0x20000;
// unused 0x40000;
private static final int PERSISTENCE_INITIALIZED_MASK = 0x80000;
// Important below mentioned bit masks are not available
/**
* Using following masks for gatewaysender queue startup polic informations.
*/
// private static final int HUB_STARTUP_POLICY_MASK = 0x07<<20;
private static final int GATEWAY_SENDER_IDS_MASK = 0x200000;
private static final int ASYNC_EVENT_QUEUE_IDS_MASK = 0x400000;
private static final int IS_OFF_HEAP_MASK = 0x800000;
private static final int CACHE_SERVICE_PROFILES_MASK = 0x1000000;
// moved initializ* to DistributionAdvisor
// moved membershipVersion to DistributionAdvisor
// moved previousVersionOpCount to DistributionAdvisor
// moved currentVersionOpCount to DistributionAdvisor
// moved removedProfiles to DistributionAdvisor
/** Creates a new instance of CacheDistributionAdvisor */
protected CacheDistributionAdvisor(CacheDistributionAdvisee region) {
super(region);
}
static CacheDistributionAdvisor createCacheDistributionAdvisor(
CacheDistributionAdvisee region) {
CacheDistributionAdvisor advisor = new CacheDistributionAdvisor(region);
advisor.initialize();
return advisor;
}
@Override
public String toString() {
return "CacheDistributionAdvisor for region " + getAdvisee().getFullPath();
}
// moved toStringWithProfiles to DistributionAdvisor
// moved initializationGate to DistributionAdvisor
// moved isInitialized to DistributionAdvisor
// moved addMembershipListenerAndAdviseGeneric to DistributionAdvisor
/**
* Returns a the set of members that either want all events or are caching data.
*
* @param excludeInRecovery if true then members in recovery are excluded
*/
private Set<InternalDistributedMember> adviseAllEventsOrCached(final boolean excludeInRecovery)
throws IllegalStateException {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
if (excludeInRecovery && cp.inRecovery) {
return false;
}
return cp.cachedOrAllEventsWithListener();
});
}
/**
* Provide recipient information for an update or create operation.
*
*/
Set adviseUpdate(final EntryEventImpl event) throws IllegalStateException {
if (event.hasNewValue() || event.getOperation().isPutAll()) {
// only need to distribute it to members that want all events or cache data
return adviseAllEventsOrCached(true/* fixes 41147 */);
} else {
// The new value is null so this is a create with a null value,
// in which case we only need to distribute this message to replicates
// or all events that are not a proxy or if a proxy has a listener
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
DataPolicy dp = cp.dataPolicy;
return dp.withReplication()
|| (cp.allEvents() && (dp.withStorage() || cp.hasCacheListener));
});
}
}
/**
* Provide recipient information for TX lock and commit.
*
* @return Set of Serializable members that the current transaction will be distributed to.
* Currently this is any other member who has this region defined. No reference to Set
* kept by advisor so caller is free to modify it
*/
Set<InternalDistributedMember> adviseTX() throws IllegalStateException {
boolean isMetaDataWithTransactions = getAdvisee() instanceof LocalRegion
&& ((LocalRegion) getAdvisee()).isMetaRegionWithTransactions();
Set<InternalDistributedMember> badList = Collections.emptySet();
if (!TXManagerImpl.ALLOW_PERSISTENT_TRANSACTIONS && !isMetaDataWithTransactions) {
badList = adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
return (prof.isPersistent());
});
}
if (badList.isEmpty()) {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.cachedOrAllEvents();
});
} else {
StringBuilder badIds = new StringBuilder();
Iterator biI = badList.iterator();
while (biI.hasNext()) {
badIds.append(biI.next().toString());
if (biI.hasNext())
badIds.append(", ");
}
throw new IllegalStateException(
String.format("Illegal Region Configuration for members: %s",
badIds.toString()));
}
}
/**
* Provide recipient information for netLoad
*
* @return Set of Serializable members that have a CacheLoader installed; no reference to Set kept
* by advisor so caller is free to modify it
*/
public Set adviseNetLoad() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
// if region in cache is not yet initialized, exclude
if (!prof.regionInitialized) { // fix for bug 41102
return false;
}
return prof.hasCacheLoader;
});
}
public FilterRoutingInfo adviseFilterRouting(CacheEvent event, Set cacheOpRecipients) {
FilterProfile fp = ((LocalRegion) event.getRegion()).getFilterProfile();
if (fp != null) {
return fp.getFilterRoutingInfoPart1(event, profiles, cacheOpRecipients);
}
return null;
}
/**
* Same as adviseGeneric except in recovery excluded.
*/
public Set<InternalDistributedMember> adviseCacheOp() {
return adviseAllEventsOrCached(true);
}
/*
* * Same as adviseGeneric but excludes if cache profile is in recovery
*/
Set<InternalDistributedMember> adviseInvalidateRegion() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return !cp.inRecovery;
});
}
/**
* Same as adviseGeneric
*/
public Set adviseDestroyRegion() {
return adviseGeneric();
}
/**
* Provide recipient information for netWrite
*
* @return Set of Serializable member ids that have a CacheWriter installed; no reference to Set
* kept by advisor so caller is free to modify it
*/
public Set adviseNetWrite() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile prof = (CacheProfile) profile;
// if region in cache is in recovery, exclude
if (prof.inRecovery) {
return false;
}
return prof.hasCacheWriter;
});
}
public Set<InternalDistributedMember> adviseInitializedReplicates() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.dataPolicy.withReplication() && cp.regionInitialized;
});
}
/**
* Provide recipient information for netSearch
*
* @return Set of Serializable member ids that have the region and are have storage (no need to
* search an empty cache)
*/
Set adviseNetSearch() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
// if region in cache is not yet initialized, exclude
if (!cp.regionInitialized) {
return false;
}
DataPolicy dp = cp.dataPolicy;
return dp.withStorage();
});
}
// moved dumpProfiles to DistributionAdvisor
public InitialImageAdvice adviseInitialImage(InitialImageAdvice previousAdvice) {
return adviseInitialImage(previousAdvice, false);
}
@SuppressWarnings("synthetic-access")
public InitialImageAdvice adviseInitialImage(InitialImageAdvice previousAdvice,
boolean persistent) {
initializationGate();
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
dumpProfiles("AdviseInitialImage");
}
Profile[] allProfiles = profiles; // volatile read
if (allProfiles.length == 0) {
return new InitialImageAdvice();
}
Set<InternalDistributedMember> replicates = new HashSet<>();
Set<InternalDistributedMember> others = new HashSet<>();
Set<InternalDistributedMember> preloaded = new HashSet<>();
Set<InternalDistributedMember> empties = new HashSet<>();
Set<InternalDistributedMember> uninitialized = new HashSet<>();
Set<InternalDistributedMember> nonPersistent = new HashSet<>();
Map<InternalDistributedMember, CacheProfile> memberProfiles =
new HashMap<>();
for (Profile allProfile : allProfiles) {
CacheProfile profile = (CacheProfile) allProfile;
// Make sure that we don't return a member that was in the previous initial image advice.
// Unless that member has changed it's profile since the last time we checked.
if (previousAdvice != null) {
CacheProfile previousProfile =
previousAdvice.memberProfiles.get(profile.getDistributedMember());
if (previousProfile != null
&& previousProfile.getSerialNumber() == profile.getSerialNumber()
&& previousProfile.getVersion() == profile.getVersion()) {
continue;
}
}
// if region in cache is in recovery, exclude
if (profile.inRecovery) {
uninitialized.add(profile.getDistributedMember());
continue;
}
// No need to do a GII from uninitialized member.
if (!profile.regionInitialized) {
uninitialized.add(profile.getDistributedMember());
continue;
}
if (profile.dataPolicy.withReplication()) {
if (!persistent || profile.dataPolicy.withPersistence()) {
// If the local member is persistent, we only want
// to include persistent replicas in the set of replicates.
replicates.add(profile.getDistributedMember());
} else {
nonPersistent.add(profile.getDistributedMember());
}
memberProfiles.put(profile.getDistributedMember(), profile);
} else if (profile.dataPolicy.isPreloaded()) {
preloaded.add(profile.getDistributedMember());
memberProfiles.put(profile.getDistributedMember(), profile);
} else if (profile.dataPolicy.withStorage()) {
// don't bother asking proxy members for initial image
others.add(profile.getDistributedMember());
memberProfiles.put(profile.getDistributedMember(), profile);
} else {
empties.add(profile.getDistributedMember());
}
}
InitialImageAdvice advice = new InitialImageAdvice(replicates, others, preloaded, empties,
uninitialized, nonPersistent, memberProfiles);
if (logger.isDebugEnabled()) {
logger.debug(advice);
}
return advice;
}
/**
* returns the set of all the members in the system which requires old values and are not yet
* finished with initialization (including GII).
*
* @since GemFire 5.5
*/
Set adviseRequiresOldValueInCacheOp() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.requiresOldValueInEvents && !cp.regionInitialized;
});
}
// moved adviseProfileExchange to DistributionAdvisor
// moved getProfile to DistributionAdvisor
// moved exchangeProfiles to DistributionAdvisor
// moved getDistributionManager to DistributionAdvisor
/** Instantiate new distribution profile for this member */
@Override
protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
return new CacheProfile(memberId, version);
}
@Override
protected boolean evaluateProfiles(Profile newProfile, Profile oldProfile) {
boolean result = super.evaluateProfiles(newProfile, oldProfile);
if (result) {
CacheProfile newCP = (CacheProfile) newProfile;
CacheProfile oldCP = (CacheProfile) oldProfile;
if ((oldCP == null || !oldCP.regionInitialized) && newCP.regionInitialized) {
// invoke membership listeners, if any
CacheDistributionAdvisee advisee = (CacheDistributionAdvisee) getAdvisee();
advisee.remoteRegionInitialized(newCP);
}
}
return result;
}
/**
* Profile information for a remote counterpart.
*/
public static class CacheProfile extends DistributionAdvisor.Profile {
public DataPolicy dataPolicy = DataPolicy.REPLICATE;
public InterestPolicy interestPolicy = InterestPolicy.DEFAULT;
public boolean hasCacheLoader = false;
public boolean hasCacheWriter = false;
public boolean hasCacheListener = false;
public Scope scope = Scope.DISTRIBUTED_NO_ACK;
public boolean inRecovery = false;
public Set<String> gatewaySenderIds = Collections.emptySet();
public Set<String> asyncEventQueueIds = Collections.emptySet();
/**
* Will be null if the profile doesn't need to have the attributes
*/
public SubscriptionAttributes subscriptionAttributes = null;
public boolean isPartitioned = false;
public boolean isGatewayEnabled = false;
public boolean isPersistent = false;
public boolean isOffHeap = false;
// moved initialMembershipVersion to DistributionAdvisor.Profile
// moved serialNumber to DistributionAdvisor.Profile
/**
* this member's client interest / continuous query profile. This is used for event processing
* to reduce the number of times CQs are executed and to have the originating member for an
* event pay the cpu cost of executing filters on the event.
*/
public FilterProfile filterProfile;
/**
* Some cache listeners require old values in cache operation messages, at least during GII
*/
boolean requiresOldValueInEvents;
/**
* Whether the region has completed initialization, including GII. This information may be
* incorrect for a PartitionedRegion, but may be relied upon for DistributedRegions (including
* BucketRegions)
*
* @since GemFire prpersist this field is now overloaded for partitioned regions with
* persistence. In the case of pr persistence, this field indicates that the region has
* finished recovery from disk.
*/
public boolean regionInitialized;
/**
* True when a members persistent store is initialized. Note that regionInitialized may be true
* when this is false in the case of createBucketAtomically. With createBucketAtomically, the
* peristent store is not created until the EndBucketCreationMessage is sent.
*/
public boolean persistenceInitialized;
public PersistentMemberID persistentID;
/**
* This member has any cache servers. This is not actively maintained for local profiles (i.e.,
* a profile representing this vm)
*/
public boolean hasCacheServer = false;
List<CacheServiceProfile> cacheServiceProfiles = new ArrayList<>();
/** for internal use, required for DataSerializer.readObject */
public CacheProfile() {}
/** used for routing computation */
public CacheProfile(FilterProfile localProfile) {
filterProfile = localProfile;
}
public CacheProfile(InternalDistributedMember memberId, int version) {
super(memberId, version);
}
public CacheProfile(CacheProfile toCopy) {
super(toCopy.getDistributedMember(), toCopy.version);
setIntInfo(toCopy.getIntInfo());
}
/** Return the profile data information that can be stored in an int */
protected int getIntInfo() {
int s = 0;
if (dataPolicy.withReplication()) {
s |= REPLICATE_MASK;
if (dataPolicy.isPersistentReplicate()) {
s |= PERSISTENT_MASK;
}
} else {
if (dataPolicy.isEmpty())
s |= PROXY_MASK;
if (dataPolicy.isPreloaded())
s |= PRELOADED_MASK;
}
if (subscriptionAttributes != null
&& subscriptionAttributes.getInterestPolicy().isAll()) {
s |= INTEREST_MASK;
}
if (hasCacheLoader)
s |= LOADER_MASK;
if (hasCacheWriter)
s |= WRITER_MASK;
if (hasCacheListener)
s |= LISTENER_MASK;
if (scope.isDistributedAck())
s |= DIST_ACK_MASK;
if (scope.isGlobal())
s |= GLOBAL_MASK;
if (inRecovery)
s |= IN_RECOVERY_MASK;
if (isPartitioned)
s |= IS_PARTITIONED_MASK;
if (isGatewayEnabled)
s |= IS_GATEWAY_ENABLED_MASK;
if (isPersistent)
s |= PERSISTENT_MASK;
if (regionInitialized)
s |= REGION_INITIALIZED_MASK;
if (persistentID != null)
s |= PERSISTENT_ID_MASK;
if (hasCacheServer)
s |= HAS_CACHE_SERVER_MASK;
if (requiresOldValueInEvents)
s |= REQUIRES_OLD_VALUE_MASK;
if (persistenceInitialized)
s |= PERSISTENCE_INITIALIZED_MASK;
if (!gatewaySenderIds.isEmpty())
s |= GATEWAY_SENDER_IDS_MASK;
if (!asyncEventQueueIds.isEmpty())
s |= ASYNC_EVENT_QUEUE_IDS_MASK;
if (isOffHeap)
s |= IS_OFF_HEAP_MASK;
if (!cacheServiceProfiles.isEmpty())
s |= CACHE_SERVICE_PROFILES_MASK;
Assert.assertTrue(!scope.isLocal());
return s;
}
private boolean hasGatewaySenderIds(int bits) {
return (bits & GATEWAY_SENDER_IDS_MASK) != 0;
}
private boolean hasAsyncEventQueueIds(int bits) {
return (bits & ASYNC_EVENT_QUEUE_IDS_MASK) != 0;
}
/**
* @return true if the serialized message has a persistentID
*/
private boolean hasPersistentID(int bits) {
return (bits & PERSISTENT_ID_MASK) != 0;
}
public boolean isPersistent() {
return dataPolicy.withPersistence();
}
/** Set the profile data information that is stored in a short */
protected void setIntInfo(int s) {
if ((s & REPLICATE_MASK) != 0) {
if ((s & PERSISTENT_MASK) != 0) {
dataPolicy = DataPolicy.PERSISTENT_REPLICATE;
} else {
dataPolicy = DataPolicy.REPLICATE;
}
} else if ((s & PROXY_MASK) != 0) {
dataPolicy = DataPolicy.EMPTY;
} else if ((s & PRELOADED_MASK) != 0) {
dataPolicy = DataPolicy.PRELOADED;
} else { // CACHED
dataPolicy = DataPolicy.NORMAL;
}
if ((s & IS_PARTITIONED_MASK) != 0) {
if ((s & PERSISTENT_MASK) != 0) {
dataPolicy = DataPolicy.PERSISTENT_PARTITION;
} else {
dataPolicy = DataPolicy.PARTITION;
}
}
if ((s & INTEREST_MASK) != 0) {
subscriptionAttributes = new SubscriptionAttributes(InterestPolicy.ALL);
} else {
subscriptionAttributes = new SubscriptionAttributes(InterestPolicy.CACHE_CONTENT);
}
hasCacheLoader = (s & LOADER_MASK) != 0;
hasCacheWriter = (s & WRITER_MASK) != 0;
hasCacheListener = (s & LISTENER_MASK) != 0;
scope = (s & DIST_ACK_MASK) != 0 ? Scope.DISTRIBUTED_ACK
: ((s & GLOBAL_MASK) != 0 ? Scope.GLOBAL : Scope.DISTRIBUTED_NO_ACK);
inRecovery = (s & IN_RECOVERY_MASK) != 0;
isPartitioned = (s & IS_PARTITIONED_MASK) != 0;
isGatewayEnabled = (s & IS_GATEWAY_ENABLED_MASK) != 0;
isPersistent = (s & PERSISTENT_MASK) != 0;
regionInitialized = ((s & REGION_INITIALIZED_MASK) != 0);
hasCacheServer = ((s & HAS_CACHE_SERVER_MASK) != 0);
requiresOldValueInEvents = ((s & REQUIRES_OLD_VALUE_MASK) != 0);
persistenceInitialized = (s & PERSISTENCE_INITIALIZED_MASK) != 0;
isOffHeap = (s & IS_OFF_HEAP_MASK) != 0;
}
/**
* Sets the SubscriptionAttributes for the region that this profile is on
*
* @since GemFire 5.0
*/
public void setSubscriptionAttributes(SubscriptionAttributes sa) {
subscriptionAttributes = sa;
}
/**
* Return true if cached or allEvents and a listener
*/
boolean cachedOrAllEventsWithListener() {
// to fix bug 36804 to ignore hasCacheListener
// return this.dataPolicy.withStorage() ||
// (allEvents() && this.hasCacheListener);
return cachedOrAllEvents();
}
/**
* Return true if cached or allEvents
*/
boolean cachedOrAllEvents() {
return dataPolicy.withStorage() || allEvents();
}
/**
* Return true if subscribed to all events
*/
boolean allEvents() {
return subscriptionAttributes.getInterestPolicy().isAll();
}
public void addCacheServiceProfile(CacheServiceProfile profile) {
cacheServiceProfiles.add(profile);
}
private boolean hasCacheServiceProfiles(int bits) {
return (bits & CACHE_SERVICE_PROFILES_MASK) != 0;
}
/**
* Used to process an incoming cache profile.
*/
@Override
public void processIncoming(ClusterDistributionManager dm, String adviseePath,
boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
try {
Assert.assertTrue(adviseePath != null, "adviseePath was null");
InternalRegion lclRgn;
final InitializationLevel oldLevel = LocalRegion.setThreadInitLevelRequirement(ANY_INIT);
try {
InternalCache cache = dm.getCache();
lclRgn = cache == null ? null : cache.getRegionByPath(adviseePath);
} finally {
LocalRegion.setThreadInitLevelRequirement(oldLevel);
}
if (lclRgn instanceof CacheDistributionAdvisee) {
if (lclRgn.isUsedForPartitionedRegionBucket()) {
if (!((BucketRegion) lclRgn).isPartitionedRegionOpen()) {
return;
}
}
handleCacheDistributionAdvisee((CacheDistributionAdvisee) lclRgn, adviseePath,
removeProfile, exchangeProfiles, true, replyProfiles);
} else {
if (lclRgn == null) {
handleCacheDistributionAdvisee(
PartitionedRegionHelper.getProxyBucketRegion(dm.getCache(), adviseePath, false),
adviseePath, removeProfile, exchangeProfiles, false, replyProfiles);
} else {
if (logger.isDebugEnabled()) {
logger.debug("While processing UpdateAttributes message, region has local scope: {}",
adviseePath);
}
}
}
} catch (PRLocallyDestroyedException fre) {
if (logger.isDebugEnabled()) {
logger.debug("<Region Locally destroyed> /// {}", this);
}
} catch (RegionDestroyedException e) {
if (logger.isDebugEnabled()) {
logger.debug("<region destroyed> /// {}", this);
}
}
}
@Override
public void cleanUp() {
if (filterProfile != null) {
filterProfile.cleanUp();
}
}
/**
* Attempts to process this message with the specified <code>CacheDistributionAdvisee</code>.
*
* @param cda the CacheDistributionAdvisee to apply this profile to
* @param isRealRegion true if CacheDistributionAdvisee is a real region
*/
private void handleCacheDistributionAdvisee(CacheDistributionAdvisee cda, String adviseePath,
boolean removeProfile, boolean exchangeProfiles, boolean isRealRegion,
final List<Profile> replyProfiles) {
if (cda != null) {
handleDistributionAdvisee(cda, removeProfile, isRealRegion && exchangeProfiles,
replyProfiles);
if (logger.isDebugEnabled()) {
logger.debug("While processing UpdateAttributes message, handled advisee: {}", cda);
}
} else {
if (logger.isDebugEnabled()) {
logger.debug("While processing UpdateAttributes message, region not found: {}",
adviseePath);
}
}
}
@Override
public int getDSFID() {
return CACHE_PROFILE;
}
@Override
public void toData(DataOutput out) throws IOException {
super.toData(out);
out.writeInt(getIntInfo());
if (persistentID != null) {
InternalDataSerializer.invokeToData(persistentID, out);
}
if (!gatewaySenderIds.isEmpty()) {
writeSet(gatewaySenderIds, out);
}
if (!asyncEventQueueIds.isEmpty()) {
writeSet(asyncEventQueueIds, out);
}
DataSerializer.writeObject(filterProfile, out);
if (!cacheServiceProfiles.isEmpty()) {
DataSerializer.writeObject(cacheServiceProfiles, out);
}
}
private void writeSet(Set<?> set, DataOutput out) throws IOException {
// to fix bug 47205 always serialize the Set as a HashSet.
out.writeByte(DSCODE.HASH_SET.toByte());
InternalDataSerializer.writeSet(set, out);
}
@Override
public void fromData(DataInput in) throws IOException, ClassNotFoundException {
super.fromData(in);
int bits = in.readInt();
setIntInfo(bits);
if (hasPersistentID(bits)) {
persistentID = new PersistentMemberID();
InternalDataSerializer.invokeFromData(persistentID, in);
}
if (hasGatewaySenderIds(bits)) {
gatewaySenderIds = DataSerializer.readObject(in);
}
if (hasAsyncEventQueueIds(bits)) {
asyncEventQueueIds = DataSerializer.readObject(in);
}
filterProfile = DataSerializer.readObject(in);
if (hasCacheServiceProfiles(bits)) {
cacheServiceProfiles = DataSerializer.readObject(in);
}
}
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("CacheProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
sb.append("; dataPolicy=").append(dataPolicy);
sb.append("; hasCacheLoader=").append(hasCacheLoader);
sb.append("; hasCacheWriter=").append(hasCacheWriter);
sb.append("; hasCacheListener=").append(hasCacheListener);
sb.append("; hasCacheServer=").append(hasCacheServer);
sb.append("; scope=").append(scope);
sb.append("; regionInitialized=").append(regionInitialized);
sb.append("; inRecovery=").append(inRecovery);
sb.append("; subcription=").append(subscriptionAttributes);
sb.append("; isPartitioned=").append(isPartitioned);
sb.append("; isGatewayEnabled=").append(isGatewayEnabled);
sb.append("; isPersistent=").append(isPersistent);
sb.append("; persistentID=").append(persistentID);
if (filterProfile != null) {
sb.append("; ").append(filterProfile);
}
sb.append("; gatewaySenderIds =").append(gatewaySenderIds);
sb.append("; asyncEventQueueIds =").append(asyncEventQueueIds);
sb.append("; IsOffHeap=").append(isOffHeap);
sb.append("; cacheServiceProfiles=").append(cacheServiceProfiles);
}
}
/** Recipient information used for getInitialImage operation */
public static class InitialImageAdvice {
public Set<InternalDistributedMember> getOthers() {
return others;
}
public void setOthers(Set<InternalDistributedMember> others) {
this.others = others;
}
public Set<InternalDistributedMember> getReplicates() {
return replicates;
}
public Set<InternalDistributedMember> getNonPersistent() {
return nonPersistent;
}
public Set<InternalDistributedMember> getPreloaded() {
return preloaded;
}
public Set<InternalDistributedMember> getEmpties() {
return empties;
}
public Set<InternalDistributedMember> getUninitialized() {
return uninitialized;
}
/** Set of replicate recipients */
protected final Set<InternalDistributedMember> replicates;
/** Set of peers that are preloaded */
protected final Set<InternalDistributedMember> preloaded;
/**
* Set of tertiary recipients which are not replicates, in which case they should all be queried
* and a superset taken of their images. To be used only if the image cannot be obtained from
* the replicates set.
*/
protected Set<InternalDistributedMember> others;
/** Set of members that might be data feeds and have EMPTY data policy */
protected final Set<InternalDistributedMember> empties;
/** Set of members that may not have finished initializing their caches */
protected final Set<InternalDistributedMember> uninitialized;
/** Set of members that are replicates but not persistent */
final Set<InternalDistributedMember> nonPersistent;
private final Map<InternalDistributedMember, CacheProfile> memberProfiles;
public InitialImageAdvice(Set<InternalDistributedMember> replicates,
Set<InternalDistributedMember> others, Set<InternalDistributedMember> preloaded,
Set<InternalDistributedMember> empties, Set<InternalDistributedMember> uninitialized,
Set<InternalDistributedMember> nonPersistent,
Map<InternalDistributedMember, CacheProfile> memberProfiles) {
this.replicates = replicates;
this.others = others;
this.preloaded = preloaded;
this.empties = empties;
this.uninitialized = uninitialized;
this.nonPersistent = nonPersistent;
this.memberProfiles = memberProfiles;
}
public InitialImageAdvice() {
this(Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(), Collections.emptySet(),
Collections.emptyMap());
}
@Override
public String toString() {
return "InitialImageAdvice(" + "replicates=" + replicates + "; others=" + others
+ "; preloaded=" + preloaded + "; empty=" + empties + "; initializing="
+ uninitialized + ")";
}
}
/**
* Provide all the replicates including persistent replicates.
*
* @return the set of replicate's memberIds
* @since GemFire 5.8
*/
public Set<InternalDistributedMember> adviseReplicates() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.dataPolicy.withReplication();
});
}
/**
* Provide only the preloadeds given a set of existing memberIds
*
* @return the set of preloaded's memberIds
* @since GemFire prPersistSprint1
*/
public Set advisePreloadeds() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.dataPolicy.withPreloaded();
});
}
/**
* Provide only the empty's (having DataPolicy.EMPTY) given a set of existing memberIds
*
* @return the set of replicate's memberIds
* @since GemFire 5.8
*/
Set adviseEmptys() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.dataPolicy.isEmpty();
});
}
@Override
protected void profileRemoved(Profile profile) {
if (logger.isDebugEnabled()) {
logger.debug("CDA: removing profile {}", profile);
}
if (getAdvisee() instanceof LocalRegion && profile != null) {
((LocalRegion) getAdvisee()).removeCriticalMember(profile.getDistributedMember());
}
}
/**
* Returns the list of all persistent members. For most cases, adviseInitializedPersistentMembers
* is more appropriate. These method includes members that are still in the process of GII.
*/
public Map<InternalDistributedMember, PersistentMemberID> advisePersistentMembers() {
initializationGate();
Map<InternalDistributedMember, PersistentMemberID> result =
new HashMap<>();
Profile[] snapshot = profiles;
for (Profile profile : snapshot) {
CacheProfile cp = (CacheProfile) profile;
if (cp.persistentID != null) {
result.put(cp.getDistributedMember(), cp.persistentID);
}
}
return result;
}
public Map<InternalDistributedMember, PersistentMemberID> adviseInitializedPersistentMembers() {
initializationGate();
Map<InternalDistributedMember, PersistentMemberID> result =
new HashMap<>();
Profile[] snapshot = profiles;
for (Profile profile : snapshot) {
CacheProfile cp = (CacheProfile) profile;
if (cp.persistentID != null && cp.persistenceInitialized) {
result.put(cp.getDistributedMember(), cp.persistentID);
}
}
return result;
}
Set adviseCacheServers() {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return cp.hasCacheServer;
});
}
// Overrided for bucket regions. This listener also receives events
// about PR joins and leaves.
public void addMembershipAndProxyListener(MembershipListener listener) {
addMembershipListener(listener);
}
public void removeMembershipAndProxyListener(MembershipListener listener) {
removeMembershipListener(listener);
}
@Override
public boolean removeId(ProfileId memberId, boolean crashed, boolean destroyed,
boolean fromMembershipListener) {
boolean isPersistent = false;
DiskStoreID persistentId = null;
CacheDistributionAdvisee advisee = (CacheDistributionAdvisee) getAdvisee();
if (advisee.getAttributes().getDataPolicy().withPersistence()) {
isPersistent = true;
CacheProfile profile = (CacheProfile) getProfile(memberId);
if (profile != null && profile.persistentID != null) {
persistentId = ((CacheProfile) getProfile(memberId)).persistentID.getDiskStoreId();
}
}
boolean result = super.removeId(memberId, crashed, destroyed, fromMembershipListener);
// bug #48962 - record members that leave during GII so IIOp knows about them
if (advisee instanceof DistributedRegion) {
DistributedRegion r = (DistributedRegion) advisee;
if (!r.isInitialized() && !r.isUsedForPartitionedRegionBucket()) {
if (logger.isDebugEnabled()) {
logger.debug("recording that {} has left during initialization of {}", memberId,
r.getName());
}
ImageState state = r.getImageState();
if (isPersistent) {
if (persistentId != null) {
state.addLeftMember(persistentId);
}
} else {
state.addLeftMember((InternalDistributedMember) memberId);
}
}
}
return result;
}
}