blob: c901b8a8903447952a131c80568a4267fbf62a5a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.distributed.internal;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.GemFireIOException;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.UpdateAttributesProcessor;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.cache.versions.VersionSource;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DataSerializableFixedID;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.serialization.Version;
import org.apache.geode.internal.util.ArrayUtils;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Provides advice on sending distribution messages. For a given operation, this advisor will
* provide a list of recipients that a message should be sent to, and other information depending on
* the operation. Each distributed entity that can have remote counterparts maintains an instance of
* <code>DistributionAdvisor</code> and maintains it by giving it a <code>Profile</code> for each of
* its remote counterparts, and telling it to delete a profile when that counterpart no longer
* exists.
* <p>
* Provides <code>advise</code> methods for each type of operation that requires specialized
* decision making based on the profiles. For all other operations that do not require specialized
* decision making, the {@link #adviseGeneric} method is provided.
* <p>
* A primary design goal of this class is scalability: the footprint must be kept to a minimum as
* the number of instances grows across a growing number of members in the distributed system.
*
* @since GemFire 3.0
*/
public class DistributionAdvisor {
private static final Logger logger = LogService.getLogger();
/**
* Specifies the starting version number for the profileVersionSequencer.
*/
private static final int START_VERSION_NUMBER = Integer
.getInteger(DistributionConfig.GEMFIRE_PREFIX + "DistributionAdvisor.startVersionNumber", 1);
/**
* Specifies the starting serial number for the serialNumberSequencer.
*/
private static final int START_SERIAL_NUMBER =
Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.startSerialNumber", 1
// Integer.MAX_VALUE-10
);
/**
* Incrementing serial number used to identify order of resource creation
*/
@MakeNotStatic
private static final AtomicInteger serialNumberSequencer = new AtomicInteger(START_SERIAL_NUMBER);
/**
* This serial number indicates a "missing" serial number.
*/
public static final int ILLEGAL_SERIAL = -1;
/**
* Used to compare profile versioning numbers against {@link Integer#MAX_VALUE} and
* {@link Integer#MIN_VALUE} to determine if a rollover has occurred.
*/
private static final int ROLLOVER_THRESHOLD = Integer
.getInteger(DistributionConfig.GEMFIRE_PREFIX + "CacheDistributionAdvisor.rolloverThreshold",
1000);
/**
* {@link Integer#MAX_VALUE} minus {@link #ROLLOVER_THRESHOLD} determines the upper threshold for
* rollover comparison.
*/
private static final int ROLLOVER_THRESHOLD_UPPER = Integer.MAX_VALUE - ROLLOVER_THRESHOLD;
/**
* {@link Integer#MIN_VALUE} plus {@link #ROLLOVER_THRESHOLD} determines the lower threshold for
* rollover comparison.
*/
private static final int ROLLOVER_THRESHOLD_LOWER = Integer.MIN_VALUE + ROLLOVER_THRESHOLD;
/**
* Incrementing serial number used to identify order of region creation
*
* @see Profile#getVersion()
*/
private final AtomicInteger profileVersionSequencer = new AtomicInteger(START_VERSION_NUMBER);
/**
* The operationMonitor tracks in-progress cache operations and holds the profile set
* version number
*/
private final OperationMonitor operationMonitor =
logger.isDebugEnabled() ? new ThreadTrackingOperationMonitor(this)
: new OperationMonitor(this);
/**
* Indicates whether this advisor is has been initialized. This will be false when a shared region
* is mapped into the cache but there has been no distributed operations done on it yet.
*/
private volatile boolean initialized = false;
/**
* Synchronization lock used for controlling access to initialization. We do not synchronize on
* this advisor itself because we use that synchronization for putProfile and we can not lock out
* putProfile while we are doing initialization
*/
private final Object initializeLock = new Object();
/**
* whether membership ops are closed (because the DA's been closed). Access under synchronization
* on (this)
*/
private boolean membershipClosed;
/**
* Hold onto removed profiles to compare to late-processed profiles. Fix for bug 36881. Protected
* by synchronizing on this DistributionAdvisor. guarded.By this DistributionAdvisor
*/
private final Map<ProfileId, Integer> removedProfiles = new HashMap<>();
/**
* My database of Profiles
*/
protected volatile Profile[] profiles = new Profile[0];
/**
* Number of active profiles
*/
private int numActiveProfiles = 0;
/**
* A collection of MembershipListeners that want to be notified when a profile is added to or
* removed from this DistributionAdvisor. The keys are membership listeners and the values are
* Boolean.TRUE.
*/
private ConcurrentMap<MembershipListener, Boolean> membershipListeners =
new ConcurrentHashMap<>();
/**
* A collection of listeners for changes to profiles. These listeners are notified if a profile is
* added, removed, or updated.
*/
private ConcurrentMap<ProfileListener, Boolean> profileListeners = new ConcurrentHashMap<>();
private volatile InitializationListener initializationListener;
/**
* The resource getting advise from this.
*/
private final DistributionAdvisee advisee;
/**
* The membership listener registered with the dm.
*/
private final MembershipListener membershipListener;
protected DistributionAdvisor(DistributionAdvisee advisee) {
this.advisee = advisee;
membershipListener = new MembershipListener() {
@Override
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
// Ignore
}
@Override
public void quorumLost(DistributionManager distributionManager,
Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {}
@Override
@SuppressWarnings("synthetic-access")
public void memberDeparted(DistributionManager distributionManager,
final InternalDistributedMember id, boolean crashed) {
boolean shouldSync = crashed && shouldSyncForCrashedMember(id);
final Profile profile = getProfile(id);
boolean removed =
removeId(id, crashed, false/* destroyed */, true/* fromMembershipListener */);
// if concurrency checks are enabled and this was a crash we may need to
// sync with other members in case an update was lost. We do this in the
// waiting thread pool so as not to block other membership listeners
if (removed && shouldSync) {
syncForCrashedMember(id, profile);
}
}
@Override
public void memberSuspect(DistributionManager distributionManager,
InternalDistributedMember id, InternalDistributedMember whoSuspected, String reason) {}
};
}
public static DistributionAdvisor createDistributionAdvisor(DistributionAdvisee advisee) {
DistributionAdvisor advisor = new DistributionAdvisor(advisee);
advisor.initialize();
return advisor;
}
protected void initialize() {
subInit();
getDistributionManager().addMembershipListener(membershipListener);
}
private void subInit() {
// override for any additional initialization specific to subclass
}
/**
* determine whether a delta-gii synchronization should be performed for this lost member
*
* @return true if a delta-gii should be performed
*/
public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
return (advisee instanceof DistributedRegion)
&& ((DistributedRegion) advisee).shouldSyncForCrashedMember(id);
}
/** perform a delta-GII for the given lost member */
public void syncForCrashedMember(final InternalDistributedMember id, final Profile profile) {
final DistributedRegion dr = getRegionForDeltaGII();
if (dr == null) {
return;
}
final boolean isDebugEnabled = logger.isDebugEnabled();
if (isDebugEnabled) {
logger.debug("da.syncForCrashedMember will sync region in cache's timer for region: {}", dr);
}
CacheProfile cacheProfile = (CacheProfile) profile;
PersistentMemberID persistentId = getPersistentID(cacheProfile);
VersionSource lostVersionID;
if (persistentId != null) {
lostVersionID = persistentId.getVersionMember();
} else {
lostVersionID = id;
}
// schedule the synchronization for execution in the future based on the client health monitor
// interval. This allows client caches to retry an operation that might otherwise be recovered
// through the sync operation. Without associated event information this could cause the
// retried operation to be mishandled. See GEODE-5505
final long delay = getDelay(dr);
if (dr.getDataPolicy().withPersistence() && persistentId == null) {
// Fix for GEODE-6886 (#46704). The lost member may be an empty accessor
// of a persistent replicate region. We don't need to do a synchronization
// in that case, because those members send their writes to a persistent member.
// Only a persistent member can generate the version.
if (logger.isDebugEnabled()) {
logger.debug(
"da.syncForCrashedMember skipping sync because crashed member is not persistent: {}",
id);
}
return;
}
dr.scheduleSynchronizeForLostMember(id, lostVersionID, delay);
if (dr.getConcurrencyChecksEnabled()) {
dr.setRegionSynchronizeScheduled(lostVersionID);
}
}
PersistentMemberID getPersistentID(CacheProfile cp) {
return cp.persistentID;
}
long getDelay(DistributedRegion dr) {
return dr.getGemFireCache().getCacheServers().stream()
.mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L);
}
/** find the region for a delta-gii operation (synch) */
public DistributedRegion getRegionForDeltaGII() {
if (advisee instanceof DistributedRegion) {
return (DistributedRegion) advisee;
}
return null;
}
protected String toStringWithProfiles() {
final StringBuilder sb = new StringBuilder(toString());
sb.append(" with profiles=(");
Profile[] profs = profiles; // volatile read
for (int i = 0; i < profs.length; i++) {
if (i > 0) {
sb.append(", ");
}
sb.append(profs[i]);
}
sb.append(")");
return sb.toString();
}
/**
* Increment and get next profile version from {@link #profileVersionSequencer}.
*
* @return next profile version number
*/
protected int incrementAndGetVersion() {
// NOTE: int should rollover if value is Integer.MAX_VALUE
return profileVersionSequencer.incrementAndGet();
}
/**
* Generates a serial number for identifying a logical resource. Later instances of the same
* logical resource will have a greater serial number than earlier instances. This number
* increments statically throughout the life of this JVM. Rollover to negative is allowed.
*
* @see #ILLEGAL_SERIAL
* @return the new serial number
*/
public static int createSerialNumber() {
for (;;) {
// NOTE: AtomicInteger should rollover if value is Integer.MAX_VALUE
int result = serialNumberSequencer.incrementAndGet();
if (result != ILLEGAL_SERIAL) {
return result;
}
}
}
public DistributionManager getDistributionManager() {
return getAdvisee().getDistributionManager();
}
/**
* Like getDistributionManager but does not check
* that the DistributedSystem is still connected
*/
private DistributionManager getDistributionManagerWithNoCheck() {
return getAdvisee().getSystem().getDM();
}
public DistributionAdvisee getAdvisee() {
return advisee;
}
/**
* Free up resources used by this advisor once it is no longer being used.
*
* @since GemFire 3.5
*/
public void close() {
try {
synchronized (this) {
membershipClosed = true;
operationMonitor.close();
}
getDistributionManager().removeMembershipListener(membershipListener);
} catch (CancelException e) {
// if distribution has stopped, above is a no-op.
} catch (IllegalArgumentException ignore) {
// this is thrown if the listener is no longer registered
}
}
/**
* Atomically add listener to the list to receive notification when a *new* profile is added or a
* profile is removed, and return adviseGeneric(). This ensures that no membership listener calls
* are missed, but there is no guarantee that there won't be redundant listener calls.
*/
public Set<InternalDistributedMember> addMembershipListenerAndAdviseGeneric(
MembershipListener listener) {
initializationGate(); // exchange profiles before acquiring lock on membershipListeners
membershipListeners.putIfAbsent(listener, Boolean.TRUE);
return adviseGeneric();
}
/**
* Add listener to the list to receive notification when a profile is added or removed. Note that
* there is no guarantee that the listener will not get redundant calls, but the listener is
* guaranteed to get a call.
*/
public void addMembershipListener(MembershipListener listener) {
membershipListeners.putIfAbsent(listener, Boolean.TRUE);
}
public interface InitializationListener {
/**
* Called after this DistributionAdvisor has been initialized.
*/
void initialized();
}
public void setInitializationListener(InitializationListener listener) {
this.initializationListener = listener;
}
public boolean addProfileChangeListener(ProfileListener listener) {
return null == profileListeners.putIfAbsent(listener, Boolean.TRUE);
}
public void removeProfileChangeListener(ProfileListener listener) {
profileListeners.remove(listener);
}
/**
* Remove listener from the list to receive notification when a provile is added or removed.
*
* @return true if listener was in the list
*/
public boolean removeMembershipListener(MembershipListener listener) {
return membershipListeners.remove(listener) != null;
}
/** Called by CreateRegionProcessor after it does its own profile exchange */
public void setInitialized() {
synchronized (initializeLock) {
initialized = true;
}
}
/** Return true if exchanged profiles */
public boolean initializationGate() {
if (initialized) {
return false;
}
boolean exchangedProfiles = false;
try {
synchronized (initializeLock) {
if (!initialized) {
exchangedProfiles = true;
exchangeProfiles();
return true;
}
}
} finally {
if (exchangedProfiles) {
if (this.initializationListener != null) {
// this needs to be done outside the initializeLock
this.initializationListener.initialized();
}
}
}
return false;
}
// wait for pending profile exchange to complete before returning
public boolean isInitialized() {
synchronized (initializeLock) {
return initialized;
}
}
/**
* Polls the isInitialized state. Unlike {@link #isInitialized} it will not wait for it to become
* initialized if it is in the middle of being initialized.
*
* @since GemFire 5.7
*/
public boolean pollIsInitialized() {
return initialized;
}
/**
* Dumps out all profiles in this advisor.
*
* @param infoMsg prefix message to log
*/
public void dumpProfiles(String infoMsg) {
Profile[] profs = profiles;
final StringBuilder buf = new StringBuilder(2000);
if (infoMsg != null) {
buf.append(infoMsg);
buf.append(": ");
}
buf.append("FYI, DUMPING PROFILES IN ");
buf.append(toString());
buf.append(":\n");
buf.append("My Profile=");
buf.append(getAdvisee().getProfile());
buf.append("\nOther Profiles:\n");
for (Profile prof : profs) {
buf.append("\t");
buf.append(prof.toString());
buf.append("\n");
}
if (logger.isDebugEnabled()) {
logger.debug(buf.toString());
}
}
/**
* Create or update a profile for a remote counterpart.
*
* @param profile the profile, referenced by this advisor after this method returns.
*/
public boolean putProfile(Profile profile) {
return putProfile(profile, false);
}
public synchronized boolean putProfile(Profile newProfile, boolean forceProfile) {
try {
return doPutProfile(newProfile, forceProfile);
} finally {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "putProfile exiting {}",
toStringWithProfiles());
}
}
}
/**
* Return true if the memberId on the specified Profile is a current member of the distributed
* system.
*
* @since GemFire 5.7
*/
protected boolean isCurrentMember(Profile p) {
return getDistributionManager().isCurrentMember(p.getDistributedMember());
}
/**
* Update the Advisor with profiles describing remote instances of the
* {@link DistributionAdvisor#getAdvisee()}. Profile information is versioned via
* {@link Profile#getVersion()} and may be ignored if an older version is received after newer
* versions.
*
* @param newProfile the profile to add
* @param forceProfile true will force profile to be added even if member is not in distributed
* view (should only ever be true for tests that need to inject a bad profile)
*
* @return true if the profile was applied, false if the profile was ignored
*/
private synchronized boolean doPutProfile(Profile newProfile, boolean forceProfile) {
assert newProfile != null;
// prevent putting of profile that is gone from the view
if (!forceProfile) {
// ensure member is in distributed system view
if (!isCurrentMember(newProfile)) {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"putProfile: ignoring {}; not in current view for {}",
newProfile.getDistributedMember(), getAdvisee().getFullPath());
}
// member is no longer in system so do nothing
return false;
}
}
// prevent putting of profile for which we already received removal msg
Integer removedSerialNumber = removedProfiles.get(newProfile.getId());
if (removedSerialNumber != null
&& !isNewerSerialNumber(newProfile.getSerialNumber(), removedSerialNumber)) {
// removedProfile exists and newProfile is NOT newer so do nothing
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"putProfile: Skipping putProfile: {} is not newer than serialNumber {} for {}",
newProfile, removedSerialNumber, getAdvisee().getFullPath());
}
return false;
}
// compare newProfile to oldProfile if one is found
Profile oldProfile = getProfile(newProfile.getId());
final boolean isTraceEnabled_DistributionAdvisor =
logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE);
if (isTraceEnabled_DistributionAdvisor) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"putProfile: Updating existing profile: {} with new profile: {} for {}", oldProfile,
newProfile, getAdvisee().getFullPath());
}
if (oldProfile != null && !isNewerProfile(newProfile, oldProfile)) {
// oldProfile exists and newProfile is NOT newer so do nothing
if (isTraceEnabled_DistributionAdvisor) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"putProfile: Ignoring {} because it's older than or same as {} for {}", newProfile,
oldProfile, getAdvisee().getFullPath());
}
return false;
}
// handle membershipVersion for state flush
if (newProfile.initialMembershipVersion == 0) {
if (oldProfile != null) {
newProfile.initialMembershipVersion = oldProfile.initialMembershipVersion;
} else {
if (!membershipClosed) {
operationMonitor.initNewProfile(newProfile);
}
}
} else {
operationMonitor.forceNewMembershipVersion();
}
if (isTraceEnabled_DistributionAdvisor) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"DistributionAdvisor ({}) putProfile: {}", this, newProfile);
}
boolean doAddOrUpdate = evaluateProfiles(newProfile, oldProfile);
if (!doAddOrUpdate) {
return false;
}
if (basicAddProfile(newProfile)) {
profileCreated(newProfile);
notifyListenersProfileAdded(newProfile);
notifyListenersMemberAdded(newProfile.getDistributedMember());
} else {
notifyListenersProfileUpdated(newProfile);
profileUpdated(newProfile);
}
return true;
}
/**
* A callback to sub-classes for extra validation logic
*
* @return true if the change from old to new is valid
*/
protected boolean evaluateProfiles(Profile newProfile, Profile oldProfile) {
return true;
}
/**
* Returns true if newProfile is newer than oldProfile. This is determined by comparing
* {@link Profile#getSerialNumber()} and {@link Profile#getVersion()}. If the old versioning
* number being compared is above {@link #ROLLOVER_THRESHOLD_UPPER} and the new versioning number
* is below {@link #ROLLOVER_THRESHOLD_LOWER} then a rollover is assumed to have occurred, which
* means the new versioning number is newer.
*
* @param newProfile the newer profile
* @param oldProfile the older profile
* @return true if newProfile is newer than oldProfile
*/
private boolean isNewerProfile(Profile newProfile, Profile oldProfile) {
Assert.assertHoldsLock(this, true);
boolean isNewer = true;
// force version comparison
int oldSerial = oldProfile.getSerialNumber();
int newSerial = newProfile.getSerialNumber();
// boolean serialRolled = oldSerial > 0 && newSerial < 0;
boolean serialRolled =
oldSerial > ROLLOVER_THRESHOLD_UPPER && newSerial < ROLLOVER_THRESHOLD_LOWER;
int oldVersion = oldProfile.getVersion();
int newVersion = newProfile.getVersion();
// boolean versionRolled = oldVersion > 0 && newVersion < 0;
boolean versionRolled =
oldVersion > ROLLOVER_THRESHOLD_UPPER && newVersion < ROLLOVER_THRESHOLD_LOWER;
boolean newIsNewer;
if (oldSerial == newSerial) {
// if region serial is same, compare versions
newIsNewer = versionRolled || oldVersion < newVersion;
} else {
// compare region serial
newIsNewer = serialRolled || oldSerial < newSerial;
}
if (!newIsNewer) {
isNewer = false;
}
return isNewer;
}
/**
* Compare two serial numbers
*
* @return return true if the first serial number (newSerialNumber) is more recent
*/
public static boolean isNewerSerialNumber(int newSerialNumber, int oldSerialNumber) {
boolean serialRolled =
oldSerialNumber > ROLLOVER_THRESHOLD_UPPER && newSerialNumber < ROLLOVER_THRESHOLD_LOWER;
return serialRolled || oldSerialNumber < newSerialNumber;
}
/**
* Create a new version of the membership profile set. This is used in flushing state out of the
* VM for previous versions of the set.
*
* @since GemFire 5.1
*/
public void forceNewMembershipVersion() {
operationMonitor.forceNewMembershipVersion();
}
/**
* this method must be invoked at the start of every operation that can modify the state of
* resource. The return value must be recorded and sent to the advisor in an endOperation message
* when messages for the operation have been put in the DistributionManager's outgoing "queue".
*
* @return the current membership version for this advisor
* @since GemFire 5.1
*/
public long startOperation() {
return operationMonitor.startOperation();
}
/**
* This method must be invoked when messages for an operation have been put in the
* DistributionManager's outgoing queue.
*
* @param version The membership version returned by startOperation
* @since GemFire 5.1
*/
public void endOperation(long version) {
operationMonitor.endOperation(version);
}
/**
* wait for the current operations being sent on views prior to the joining of the given member to
* be placed on communication channels before returning
*
* @since GemFire 5.1
*/
public void waitForCurrentOperations() {
operationMonitor.waitForCurrentOperations();
}
public void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) {
// this may wait longer than it should if the membership version changes, dumping
// more operations into the previousVersionOpCount
operationMonitor.waitForCurrentOperations(alertLogger, warnMS, severeAlertMS);
}
/**
* Bypass the distribution manager and ask the membership manager directly if a given member is
* still in the view.
*
* We need this because we're asking membership questions from within listeners, and we don't know
* whether the DM's membership listener fires before or after our own.
*
* @param id member we are asking about
* @return true if we are still in the JGroups view (must return false if id == null)
*/
protected boolean stillInView(ProfileId id) {
if (id instanceof InternalDistributedMember) {
InternalDistributedMember memberId = (InternalDistributedMember) id;
return getDistributionManager().getViewMembers().contains(memberId);
} else {
// if id is not a InternalDistributedMember then return false
return false;
}
}
/**
* Given member is no longer pertinent to this advisor; remove it.
*
* This is often overridden in subclasses, but they need to defer to their superclass at some
* point in their re-implementation.
*
* @param memberId the member to remove
* @param crashed true if the member did not leave normally
* @return true if it was being tracked
*/
private boolean basicRemoveId(ProfileId memberId, boolean crashed, boolean destroyed) {
final boolean isDebugEnabled = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE);
if (isDebugEnabled) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "DistributionAdvisor ({}) removeId {}",
this, memberId);
}
Profile profileRemoved = basicRemoveMemberId(memberId);
if (profileRemoved == null) {
if (isDebugEnabled) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"DistributionAdvisor.removeId: no profile to remove for {}", memberId);
}
return false;
}
if (isDebugEnabled) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"DistributionAdvisor.removeId: removed profile for {}", memberId);
}
profileRemoved(profileRemoved);
notifyListenersProfileRemoved(profileRemoved, destroyed);
notifyListenersMemberRemoved(profileRemoved.getDistributedMember(), crashed);
profileRemoved.cleanUp();
return true;
}
/**
* Removes the specified profile if it is registered with this advisor.
*
* @since GemFire 5.7
*/
private void removeProfile(Profile profile) {
removeId(profile.getId(), false, false, false);
}
/**
* Removes the profile for the given member. This method is meant to be overriden by subclasses.
*
* @param memberId the member whose profile should be removed
* @param crashed true if the member crashed
* @param fromMembershipListener true if this call is a result of MembershipEvent invocation
* (fixes #42000)
* @return true when the profile was removed, false otherwise
*/
public boolean removeId(final ProfileId memberId, boolean crashed, boolean destroyed,
boolean fromMembershipListener) {
boolean result;
try {
result = doRemoveId(memberId, crashed, destroyed, fromMembershipListener);
} finally {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "removeId {} exiting {}", memberId,
toStringWithProfiles());
}
}
return result;
}
private boolean doRemoveId(ProfileId memberId, boolean crashed, boolean destroyed,
boolean fromMembershipListener) {
final boolean isDebugEnabled_DA = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE);
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"removeId: removing member {} from resource {}", memberId, getAdvisee().getFullPath());
}
synchronized (this) {
// If the member has disappeared, completely remove
if (!fromMembershipListener) {
boolean result = false;
// Assert.assertTrue(!crashed); // should not get here :-)
// Is there an existing profile? If so, add it to list of those removed.
Profile profileToRemove = getProfile(memberId);
while (profileToRemove != null) {
result = true;
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "removeId: tracking removal of {}",
profileToRemove);
}
removedProfiles.put(profileToRemove.getDistributedMember(),
profileToRemove.getSerialNumber());
basicRemoveId(profileToRemove.getId(), crashed, destroyed);
profileToRemove = getProfile(memberId);
}
return result;
} else {
// Garbage collect; this profile is no longer pertinent
removedProfiles.remove(memberId);
boolean result = basicRemoveId(memberId, crashed, destroyed);
while (basicRemoveId(memberId, crashed, destroyed)) {
// keep removing profiles that match until we have no more
}
return result;
}
}
}
/**
* Removes the profile for the specified member and serial number
*
* @param memberId the member to remove the profile for
* @param serialNum specific serial number to remove
* @return true if a matching profile for the member was found
*/
public boolean removeIdWithSerial(InternalDistributedMember memberId, int serialNum,
boolean regionDestroyed) {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"removeIdWithSerial: removing member {} with serial {} from resource {}", memberId,
serialNum, getAdvisee().getName());
}
Assert.assertTrue(serialNum != ILLEGAL_SERIAL);
return updateRemovedProfiles(memberId, serialNum, regionDestroyed);
}
/**
* Update the list of removed profiles based on given serial number, and ensure that the given
* member is no longer in the list of bucket owners.
*
* @param memberId member to remove
* @param serialNum serial number
* @return true if this member was an owner
*/
private synchronized boolean updateRemovedProfiles(InternalDistributedMember memberId,
int serialNum, boolean regionDestroyed) {
final boolean isDebugEnabled_DA = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE);
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"updateRemovedProfiles: ensure member {} with serial {} is removed from region {}",
memberId, serialNum, getAdvisee().getFullPath());
}
boolean removedId = false;
if (stillInView(memberId)) {
boolean isNews = true;
// If existing profile is newer, just return
Profile profileToRemove = getProfile(memberId);
if (profileToRemove != null) {
if (isNewerSerialNumber(profileToRemove.serialNumber, serialNum)) {
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"updateRemovedProfiles: member {} has profile {} which is newer than serial {}",
memberId, profileToRemove, serialNum);
}
// We have a current profile for this member, but its serial number
// is more recent than the removal that was requested.
// Do not remove our existing profile, and do not update
// removedProfiles.
isNews = false;
}
}
if (isNews) {
// Is this a more recent removal than we have recorded?
// If not, do not remove any existing profile, and do not
// update removedProfiles
Integer oldSerial = removedProfiles.get(memberId);
if (oldSerial != null && isNewerSerialNumber(oldSerial, serialNum)) {
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"updateRemovedProfiles: member {} sent removal of serial {} but we hae already removed {}",
memberId, serialNum, oldSerial);
}
isNews = false;
}
}
if (isNews) {
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"updateRemovedProfiles: adding serial {} for member {} to removedProfiles", serialNum,
memberId);
}
// The member is still in the system, and the removal message is
// a new one. Remember this removal, and ensure that its profile
// is removed from this bucket.
removedProfiles.put(memberId, serialNum);
// Only remove profile if this removal is more recent than our
// current state
removedId = basicRemoveId(memberId, false, regionDestroyed);
}
} // isCurrentMember
else {
// If the member has disappeared, completely remove (garbage collect)
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"updateRemovedProfiles: garbage collecting member {}", memberId);
}
removedProfiles.remove(memberId);
// Always make sure that this member is removed from the advisor
removedId = basicRemoveId(memberId, false, regionDestroyed);
}
if (isDebugEnabled_DA) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "updateRemovedProfiles: removedId = {}",
removedId);
}
return removedId;
}
/**
* Indicate whether given member is being tracked
*
* @param memberId the member
* @return true if the member was being tracked
*/
public synchronized boolean containsId(InternalDistributedMember memberId) {
return (indexOfMemberId(memberId) > -1);
}
public synchronized int getNumProfiles() {
return numActiveProfiles;
}
/**
* Caller must be synchronized on this. Overridden in BucketAdvisor.
*/
private void setNumActiveProfiles(int newValue) {
numActiveProfiles = newValue;
}
public Profile getProfile(ProfileId id) {
Profile[] allProfiles = profiles; // volatile read
boolean isIDM = (id instanceof InternalDistributedMember);
for (Profile allProfile : allProfiles) {
if (isIDM) {
if (allProfile.getDistributedMember().equals(id)) {
return allProfile;
}
} else {
if (allProfile.getId().equals(id)) {
return allProfile;
}
}
}
return null;
}
/** exchange profiles to initialize this advisor */
public void exchangeProfiles() {
Assert.assertHoldsLock(this, false); // causes deadlock
Assert.assertHoldsLock(initializeLock, true);
new UpdateAttributesProcessor(getAdvisee()).distribute(true);
setInitialized();
}
/** Creates the current distribution profile for this member */
public Profile createProfile() {
Profile newProfile =
instantiateProfile(getDistributionManager().getId(), incrementAndGetVersion());
getAdvisee().fillInProfile(newProfile);
return newProfile;
}
/** Instantiate new distribution profile for this member */
protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
return new Profile(memberId, version);
}
/**
* Provide recipient information for any other operation. Returns the set of members that have
* remote counterparts.
*
* @return Set of Serializable members; no reference to Set kept by advisor so caller is free to
* modify it
*/
public Set<InternalDistributedMember> adviseGeneric() {
return adviseFilter(null);
}
/**
* Provide recipients for profile exchange, called by UpdateAttributesProcessor and
* CreateRegionProcessor. Can not be initialized at this point because it is only called in the
* following scenarios: 1) We're doing a lazy initialization and synchronization on initializeLock
* prevents other threads from causing initialization on this advisor. 2) We're creating a new
* region and doing profile exchange as part of region initialization, in which case no other
* threads have access to the region or this advisor.
*/
public Set adviseProfileExchange() {
// Get the list of recipients from the nearest initialized advisor
// in the parent chain
Assert.assertTrue(!isInitialized());
DistributionAdvisor advisor;
DistributionAdvisee advisee = getAdvisee();
do {
advisee = advisee.getParentAdvisee();
if (advisee == null)
return getDefaultDistributionMembers();
advisor = advisee.getDistributionAdvisor();
} while (!advisor.isInitialized());
// do not call adviseGeneric because we don't want to trigger another
// profile exchange on the parent
return advisor.adviseFilter(null);
}
/**
* Returns a set of the members this advisor should distribute to by default
*
* @since GemFire 5.7
*/
private Set<InternalDistributedMember> getDefaultDistributionMembers() {
if (!useAdminMembersForDefault()) {
return getDistributionManager().getOtherDistributionManagerIds();
} else {
return getDistributionManager().getAllOtherMembers();
}
}
/**
* Returns true if all members including ADMIN are required for distribution of update attributes
* message by {@link #getDefaultDistributionMembers()}.
*/
public boolean useAdminMembersForDefault() {
return false;
}
private void notifyListenersMemberAdded(InternalDistributedMember member) {
for (MembershipListener membershipListener : membershipListeners.keySet()) {
try {
membershipListener.memberJoined(getDistributionManagerWithNoCheck(), member);
} catch (Exception e) {
logger.warn("Ignoring exception during member joined listener notification", e);
}
}
}
private void notifyListenersMemberRemoved(InternalDistributedMember member, boolean crashed) {
for (MembershipListener membershipListener : membershipListeners.keySet()) {
try {
membershipListener.memberDeparted(getDistributionManagerWithNoCheck(), member,
crashed);
} catch (Exception e) {
logger.warn("Ignoring exception during member departed listener notification", e);
}
}
}
private void notifyListenersProfileRemoved(Profile profile, boolean destroyed) {
for (ProfileListener profileListener : profileListeners.keySet()) {
(profileListener).profileRemoved(profile, destroyed);
}
}
private void notifyListenersProfileAdded(Profile profile) {
for (ProfileListener profileListener : profileListeners.keySet()) {
(profileListener).profileCreated(profile);
}
}
private void notifyListenersProfileUpdated(Profile profile) {
for (ProfileListener profileListener : profileListeners.keySet()) {
(profileListener).profileUpdated(profile);
}
}
/**
* Template method for sub-classes to override. Method is invoked after a new profile is
* created/added to profiles.
*
* @param profile the created profile
*/
protected void profileCreated(Profile profile) {}
/**
* Template method for sub-classes to override. Method is invoked after a profile is updated in
* profiles.
*
* @param profile the updated profile
*/
protected void profileUpdated(Profile profile) {}
/**
* Template method for sub-classes to override. Method is invoked after a profile is removed from
* profiles.
*
* @param profile the removed profile
*/
protected void profileRemoved(Profile profile) {}
/** All advise methods go through this method */
protected Set<InternalDistributedMember> adviseFilter(Filter f) {
initializationGate();
Set<InternalDistributedMember> recipients = null;
Profile[] locProfiles = profiles; // grab current profiles
for (Profile profile : locProfiles) {
if (f == null || f.include(profile)) {
if (recipients == null) {
recipients = new HashSet<>();
}
recipients.add(profile.getDistributedMember());
}
}
if (recipients == null) {
return Collections.emptySet();
} else {
return recipients;
}
}
/**
* This method calls filter->include on every profile until include returns true.
*
* @return false if all filter->include calls returns false; otherwise true.
**/
protected boolean satisfiesFilter(Filter f) {
initializationGate();
Profile[] locProfiles = profiles; // grab current profiles
for (Profile p : locProfiles) {
if (f.include(p)) {
return true;
}
}
return false;
}
/**
* A visitor interface for all the available profiles used by
* {@link DistributionAdvisor#accept(ProfileVisitor, Object)}. Unlike the {@link Filter} class
* this does not assume of two state visit of inclusion or exclusion rather allows manipulation of
* an arbitrary aggregator that has been passed to the {@link #visit} method. In addition this is
* public for use by other classes.
*/
public interface ProfileVisitor<T> {
/**
* Visit a given {@link Profile} accumulating the results in the given aggregate. Returns false
* when the visit has to be terminated.
*
* @param advisor the DistributionAdvisor that invoked this visitor
* @param profile the profile being visited
* @param profileIndex the index of current profile
* @param numProfiles the total number of profiles being visited
* @param aggregate result aggregated so far, if any
*
* @return false if the visit has to be terminated immediately and false otherwise
*/
boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex, int numProfiles,
T aggregate);
}
/**
* Invoke the given {@link ProfileVisitor} on all the {@link Profile}s exiting when the
* {@link ProfileVisitor#visit} method returns false. Unlike the {@link #adviseFilter(Filter)}
* method this does assume the return type to be a Set of qualifying members rather allows for
* population of an arbitrary aggregator passed as the argument to this method.
*
* @param <T> the type of object used for aggregation of results
* @param visitor the {@link ProfileVisitor} to use for the visit
* @param aggregate an aggregate object that will be used to for aggregation of results by the
* {@link ProfileVisitor#visit} method; this allows the {@link ProfileVisitor} to not
* maintain any state so that in many situations a global static object encapsulating the
* required behaviour will work
*
* @return true if all the profiles were visited and false if the {@link ProfileVisitor#visit} cut
* it short by returning false
*/
public <T> boolean accept(ProfileVisitor<T> visitor, T aggregate) {
initializationGate();
final Profile[] locProfiles = profiles; // grab current profiles
final int numProfiles = locProfiles.length;
Profile p;
for (int index = 0; index < numProfiles; ++index) {
p = locProfiles[index];
if (!visitor.visit(this, p, index, numProfiles, aggregate)) {
return false;
}
}
return true;
}
/**
* Get an unmodifiable list of the <code>Profile</code>s that match the given <code>Filter</code>.
*
* @since GemFire 5.7
*/
protected List<Profile> fetchProfiles(Filter f) {
initializationGate();
List<Profile> result = null;
Profile[] locProfiles = profiles;
for (Profile profile : locProfiles) {
if (f == null || f.include(profile)) {
if (result == null) {
result = new ArrayList<>(locProfiles.length);
}
result.add(profile);
}
}
if (result == null) {
result = Collections.emptyList();
} else {
result = Collections.unmodifiableList(result);
}
return result;
}
/** Provide recipients for profile update. */
public Set adviseProfileUpdate() {
return adviseGeneric();
}
/**
* Provide recipients for profile remove.
*
* @since GemFire 5.7
*/
public Set adviseProfileRemove() {
return adviseGeneric();
}
/**
* @return true if new profile added, false if already had profile (but profile is still replaced
* with new one)
*/
// must synchronize when modifying profile array
private synchronized boolean basicAddProfile(Profile p) {
// don't add more than once, but replace existing profile
// try {
int index = indexOfMemberId(p.getId());
if (index >= 0) {
Profile[] oldProfiles = profiles; // volatile read
oldProfiles[index] = p;
profiles = oldProfiles; // volatile write
return false;
}
// minimize volatile reads by copying ref to local var
Profile[] snap = profiles; // volatile read
Profile[] newProfiles = (Profile[]) ArrayUtils.insert(snap, snap.length, p);
Objects.requireNonNull(newProfiles);
profiles = newProfiles; // volatile write
setNumActiveProfiles(newProfiles.length);
return true;
}
// must synchronize when modifying profile array
/**
* Perform work of removing the given member from this advisor.
*/
private synchronized Profile basicRemoveMemberId(ProfileId id) {
// try {
int i = indexOfMemberId(id);
if (i >= 0) {
Profile profileRemoved = profiles[i];
basicRemoveIndex(i);
return profileRemoved;
} else
return null;
// } finally {
// Assert.assertTrue(-1 == indexOfMemberId(id));
// }
}
private int indexOfMemberId(ProfileId id) {
Assert.assertHoldsLock(this, true);
Profile[] profs = profiles; // volatile read
for (int i = 0; i < profs.length; i++) {
Profile p = profs[i];
if (id instanceof InternalDistributedMember) {
if (p.getDistributedMember().equals(id))
return i;
} else {
if (p.getId().equals(id))
return i;
}
}
return -1;
}
private void basicRemoveIndex(int index) {
Assert.assertHoldsLock(this, true);
// minimize volatile reads by copying ref to local var
Profile[] oldProfiles = profiles; // volatile read
Profile[] newProfiles = new Profile[oldProfiles.length - 1];
System.arraycopy(oldProfiles, 0, newProfiles, 0, index);
System.arraycopy(oldProfiles, index + 1, newProfiles, index, newProfiles.length - index);
profiles = newProfiles; // volatile write
if (numActiveProfiles > 0) {
numActiveProfiles--;
}
}
/** Filter interface */
protected interface Filter {
boolean include(Profile profile);
}
/**
* Marker interface to designate on object that serves and the unique id that identifies a
* Profile.
*/
public interface ProfileId {
}
/**
* Profile information for a remote counterpart.
*/
public static class Profile implements DataSerializableFixedID {
/** Member for whom this profile represents */
public InternalDistributedMember peerMemberId;
/** Serial number incremented every time profile is updated by memberId */
public int version;
public int serialNumber = ILLEGAL_SERIAL;
/**
* The DistributionAdvisor's membership version where this member was added
*
* @since GemFire 5.1
*/
public transient long initialMembershipVersion;
/** for internal use, required for DataSerializable.Helper.readObject */
public Profile() {}
public Profile(InternalDistributedMember memberId, int version) {
if (memberId == null) {
throw new IllegalArgumentException(
"memberId cannot be null");
}
peerMemberId = memberId;
this.version = version;
}
/**
* Return object that uniquely identifies this profile.
*
* @since GemFire 5.7
*/
public ProfileId getId() {
return peerMemberId;
}
public int getVersion() {
return version;
}
public int getSerialNumber() {
return serialNumber;
}
@Override
public int hashCode() {
return getId().hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj == this)
return true;
if (obj == null)
return false;
if (!getClass().equals(obj.getClass()))
return false;
return getId().equals(((Profile) obj).getId());
}
/**
* Return the DistributedMember associated with this profile
*
* @since GemFire 5.0
*/
public InternalDistributedMember getDistributedMember() {
return peerMemberId;
}
@Override
public int getDSFID() {
return DA_PROFILE;
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
InternalDataSerializer.invokeToData(peerMemberId, out);
out.writeInt(version);
out.writeInt(serialNumber);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
peerMemberId = new InternalDistributedMember();
InternalDataSerializer.invokeFromData(peerMemberId, in);
version = in.readInt();
serialNumber = in.readInt();
}
/**
* Process add/remove/update of an incoming profile.
*/
public void processIncoming(ClusterDistributionManager dm, String adviseePath,
boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) {
// nothing by default; just log that nothing was done
if (logger.isDebugEnabled()) {
logger.debug("While processing UpdateAttributes message ignored incoming profile: {}",
this);
}
}
/**
* Attempts to process this message with the specified {@link DistributionAdvisee}. Also if
* exchange profiles then add the profile from {@link DistributionAdvisee} to reply.
*
* @param advisee the CacheDistributionAdvisee to apply this profile to
* @param removeProfile true to remove profile else add profile
* @param exchangeProfiles true to add the profile to reply
*/
protected void handleDistributionAdvisee(DistributionAdvisee advisee, boolean removeProfile,
boolean exchangeProfiles, final List<Profile> replyProfiles) {
final DistributionAdvisor da;
if (advisee != null && (da = advisee.getDistributionAdvisor()) != null) {
if (removeProfile) {
da.removeProfile(this);
} else {
da.putProfile(this);
}
if (exchangeProfiles) {
// assume non-null replyProfiles when exchangeProfiles is true
replyProfiles.add(advisee.getProfile());
}
}
}
@Override
public String toString() {
StringBuilder sb = getToStringHeader();
sb.append("@").append(System.identityHashCode(this)).append("(");
fillInToString(sb);
sb.append(")");
return sb.toString();
}
/**
* This will be get called when profile will be removed from advisor Do local cleanup in this
* thread, otherwsie spawn another thread to do cleanup
*/
public void cleanUp() {
}
public StringBuilder getToStringHeader() {
return new StringBuilder("Profile");
}
public void fillInToString(StringBuilder sb) {
sb.append("memberId=").append(peerMemberId);
sb.append("; version=").append(version);
sb.append("; serialNumber=").append(serialNumber);
sb.append("; initialMembershipVersion=").append(initialMembershipVersion);
}
@Override
public Version[] getSerializationVersions() {
return null;
}
}
private static class OperationMonitor {
private final DistributionAdvisor distributionAdvisor;
/**
* the version of the profile set
*/
private long membershipVersion;
/**
* the number of operations in-progress for previous versions of the profile set
*/
private long previousVersionOpCount;
/**
* the number of operations in-progress for the current version of the profile set
*/
private long currentVersionOpCount;
/**
* for debugging stalled state-flush operations we track threads performing operations
* and capture the state when startOperatiopn is invoked
*/
private boolean closed;
private OperationMonitor(DistributionAdvisor distributionAdvisor) {
this.distributionAdvisor = distributionAdvisor;
}
private synchronized void incrementMembershipVersion() {
membershipVersion++;
}
/**
* Create a new version of the membership profile set. This is used in flushing state out of the
* VM for previous versions of the set.
*
* @since GemFire 5.1
*/
synchronized void forceNewMembershipVersion() {
if (!closed) {
incrementMembershipVersion();
previousVersionOpCount += currentVersionOpCount;
currentVersionOpCount = 0;
membershipVersionChanged();
}
}
/**
* this method must be invoked at the start of every operation that can modify the state of
* resource. The return value must be recorded and sent to the advisor in an endOperation
* message when messages for the operation have been put in the DistributionManager's outgoing
* "queue".
*
* @return the current membership version for this advisor
* @since GemFire 5.1
*/
synchronized long startOperation() {
logNewOperation();
currentVersionOpCount++;
return membershipVersion;
}
/**
* This method must be invoked when messages for an operation have been put in the
* DistributionManager's outgoing queue.
*
* @param version The membership version returned by startOperation
* @since GemFire 5.1
*/
synchronized void endOperation(long version) {
if (version == membershipVersion) {
currentVersionOpCount--;
logEndOperation(true);
} else {
previousVersionOpCount--;
logEndOperation(false);
}
}
/**
* wait for the current operations being sent on views prior to the joining of the given member
* to be placed on communication channels before returning
*
* @since GemFire 5.1
*/
void waitForCurrentOperations() {
long timeout =
1000L * distributionAdvisor.getDistributionManager().getSystem().getConfig()
.getAckWaitThreshold();
waitForCurrentOperations(logger, timeout, timeout * 2L);
}
void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) {
// this may wait longer than it should if the membership version changes, dumping
// more operations into the previousVersionOpCount
final long startTime = System.currentTimeMillis();
final long warnTime = startTime + warnMS;
final long severeAlertTime = startTime + severeAlertMS;
boolean warned = false;
boolean severeAlertIssued = false;
while (operationsAreInProgress()) {
// The advisor's close() method will set the pVOC to zero. This loop
// must not terminate due to cache closure until that happens.
try {
Thread.sleep(50);
} catch (InterruptedException e) {
throw new GemFireIOException("State flush interrupted");
}
long now = System.currentTimeMillis();
if ((!warned) && System.currentTimeMillis() >= warnTime) {
warned = true;
logWaitOnOperationsWarning(alertLogger, warnMS);
} else if (warned && !severeAlertIssued && (now >= severeAlertTime)) {
logWaitOnOperationsSevere(alertLogger, severeAlertMS);
severeAlertIssued = true;
}
}
if (warned) {
alertLogger.info("Wait for current operations completed");
}
}
synchronized boolean operationsAreInProgress() {
return previousVersionOpCount > 0;
}
synchronized void initNewProfile(Profile newProfile) {
membershipVersion++;
newProfile.initialMembershipVersion = membershipVersion;
previousVersionOpCount =
previousVersionOpCount + currentVersionOpCount;
currentVersionOpCount = 0;
membershipVersionChanged();
}
synchronized void close() {
previousVersionOpCount = 0;
currentVersionOpCount = 0;
closed = true;
}
void logNewOperation() {}
void logEndOperation(boolean newOperation) {}
void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
// OSProcess.printStacks(0);
alertLogger.fatal("This thread has been stalled for {} milliseconds "
+ "waiting for current operations to complete. Something may be blocking operations.",
severeAlertMS);
}
void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
alertLogger.warn("This thread has been stalled for {} milliseconds waiting for "
+ "current operations to complete.", warnMS);
}
void membershipVersionChanged() {}
}
private static class ThreadTrackingOperationMonitor extends OperationMonitor {
/**
* for debugging stalled state-flush operations we track threads performing operations
* and capture the state when startOperatiopn is invoked
*/
private final Map<Thread, ExceptionWrapper> currentVersionOperationThreads;
private final Map<Thread, ExceptionWrapper> previousVersionOperationThreads;
private ThreadTrackingOperationMonitor(
DistributionAdvisor distributionAdvisor) {
super(distributionAdvisor);
currentVersionOperationThreads = new HashMap<>();
previousVersionOperationThreads = new HashMap<>();
}
@Override
void logNewOperation() {
currentVersionOperationThreads.put(Thread.currentThread(),
new ExceptionWrapper(new Exception("stack trace")));
}
@Override
void logEndOperation(boolean newOp) {
if (newOp) {
currentVersionOperationThreads.remove(Thread.currentThread());
} else {
previousVersionOperationThreads.remove(Thread.currentThread());
}
}
@Override
void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) {
super.logWaitOnOperationsWarning(alertLogger, warnMS);
synchronized (this) {
logger
.debug("Waiting for these threads: {}", previousVersionOperationThreads);
logger
.debug("New version threads are {}", currentVersionOperationThreads);
}
}
@Override
void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) {
super.logWaitOnOperationsSevere(alertLogger, severeAlertMS);
synchronized (this) {
logger
.debug("Waiting for these threads: {}", previousVersionOperationThreads);
logger
.debug("New version threads are {}", currentVersionOperationThreads);
}
}
@Override
void membershipVersionChanged() {
super.membershipVersionChanged();
previousVersionOperationThreads
.putAll(currentVersionOperationThreads);
currentVersionOperationThreads.clear();
}
/**
* ExceptionWrapper is used in debugging hangs in waitForCurrentOperations(). It
* captures the call stack of a thread invoking startOperation().
*/
private static class ExceptionWrapper {
private Exception exception;
ExceptionWrapper(Exception exception) {
this.exception = exception;
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder(500);
OutputStream os = new OutputStream() {
@Override
public void write(int i) {
builder.append((char) i);
}
};
PrintStream stream = new PrintStream(os);
exception.printStackTrace(stream);
return builder.toString();
}
}
}
}