| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.distributed.internal; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.io.PrintStream; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.GemFireIOException; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.InternalDataSerializer; |
| import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; |
| import org.apache.geode.internal.cache.DistributedRegion; |
| import org.apache.geode.internal.cache.UpdateAttributesProcessor; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberID; |
| import org.apache.geode.internal.cache.versions.VersionSource; |
| import org.apache.geode.internal.logging.LogService; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DataSerializableFixedID; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.serialization.Version; |
| import org.apache.geode.internal.util.ArrayUtils; |
| |
| /** |
| * Provides advice on sending distribution messages. For a given operation, this advisor will |
| * provide a list of recipients that a message should be sent to, and other information depending on |
| * the operation. Each distributed entity that can have remote counterparts maintains an instance of |
| * <code>DistributionAdvisor</code> and maintains it by giving it a <code>Profile</code> for each of |
| * its remote counterparts, and telling it to delete a profile when that counterpart no longer |
| * exists. |
| * <p> |
| * Provides <code>advise</code> methods for each type of operation that requires specialized |
| * decision making based on the profiles. For all other operations that do not require specialized |
| * decision making, the {@link #adviseGeneric} method is provided. |
| * <p> |
| * A primary design goal of this class is scalability: the footprint must be kept to a minimum as |
| * the number of instances grows across a growing number of members in the distributed system. |
| * |
| * @since GemFire 3.0 |
| */ |
| public class DistributionAdvisor { |
| |
| private static final Logger logger = LogService.getLogger(); |
| |
| /** |
| * Specifies the starting version number for the profileVersionSequencer. |
| */ |
| private static final int START_VERSION_NUMBER = Integer |
| .getInteger(DistributionConfig.GEMFIRE_PREFIX + "DistributionAdvisor.startVersionNumber", 1); |
| |
| /** |
| * Specifies the starting serial number for the serialNumberSequencer. |
| */ |
| private static final int START_SERIAL_NUMBER = |
| Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + "Cache.startSerialNumber", 1 |
| // Integer.MAX_VALUE-10 |
| ); |
| |
| /** |
| * Incrementing serial number used to identify order of resource creation |
| */ |
| @MakeNotStatic |
| private static final AtomicInteger serialNumberSequencer = new AtomicInteger(START_SERIAL_NUMBER); |
| |
| /** |
| * This serial number indicates a "missing" serial number. |
| */ |
| public static final int ILLEGAL_SERIAL = -1; |
| |
| /** |
| * Used to compare profile versioning numbers against {@link Integer#MAX_VALUE} and |
| * {@link Integer#MIN_VALUE} to determine if a rollover has occurred. |
| */ |
| private static final int ROLLOVER_THRESHOLD = Integer |
| .getInteger(DistributionConfig.GEMFIRE_PREFIX + "CacheDistributionAdvisor.rolloverThreshold", |
| 1000); |
| |
| /** |
| * {@link Integer#MAX_VALUE} minus {@link #ROLLOVER_THRESHOLD} determines the upper threshold for |
| * rollover comparison. |
| */ |
| private static final int ROLLOVER_THRESHOLD_UPPER = Integer.MAX_VALUE - ROLLOVER_THRESHOLD; |
| |
| /** |
| * {@link Integer#MIN_VALUE} plus {@link #ROLLOVER_THRESHOLD} determines the lower threshold for |
| * rollover comparison. |
| */ |
| private static final int ROLLOVER_THRESHOLD_LOWER = Integer.MIN_VALUE + ROLLOVER_THRESHOLD; |
| |
| /** |
| * Incrementing serial number used to identify order of region creation |
| * |
| * @see Profile#getVersion() |
| */ |
| private final AtomicInteger profileVersionSequencer = new AtomicInteger(START_VERSION_NUMBER); |
| |
| /** |
| * The operationMonitor tracks in-progress cache operations and holds the profile set |
| * version number |
| */ |
| private final OperationMonitor operationMonitor = |
| logger.isDebugEnabled() ? new ThreadTrackingOperationMonitor(this) |
| : new OperationMonitor(this); |
| |
| |
| /** |
| * Indicates whether this advisor is has been initialized. This will be false when a shared region |
| * is mapped into the cache but there has been no distributed operations done on it yet. |
| */ |
| private volatile boolean initialized = false; |
| |
| /** |
| * Synchronization lock used for controlling access to initialization. We do not synchronize on |
| * this advisor itself because we use that synchronization for putProfile and we can not lock out |
| * putProfile while we are doing initialization |
| */ |
| private final Object initializeLock = new Object(); |
| |
| /** |
| * whether membership ops are closed (because the DA's been closed). Access under synchronization |
| * on (this) |
| */ |
| private boolean membershipClosed; |
| |
| /** |
| * Hold onto removed profiles to compare to late-processed profiles. Fix for bug 36881. Protected |
| * by synchronizing on this DistributionAdvisor. guarded.By this DistributionAdvisor |
| */ |
| private final Map<ProfileId, Integer> removedProfiles = new HashMap<>(); |
| |
| /** |
| * My database of Profiles |
| */ |
| protected volatile Profile[] profiles = new Profile[0]; |
| |
| /** |
| * Number of active profiles |
| */ |
| private int numActiveProfiles = 0; |
| |
| /** |
| * Profiles version number |
| */ |
| protected volatile long profilesVersion = 0; |
| |
| |
| /** |
| * A collection of MembershipListeners that want to be notified when a profile is added to or |
| * removed from this DistributionAdvisor. The keys are membership listeners and the values are |
| * Boolean.TRUE. |
| */ |
| private ConcurrentMap<MembershipListener, Boolean> membershipListeners = |
| new ConcurrentHashMap<>(); |
| |
| /** |
| * A collection of listeners for changes to profiles. These listeners are notified if a profile is |
| * added, removed, or updated. |
| */ |
| private ConcurrentMap<ProfileListener, Boolean> profileListeners = new ConcurrentHashMap<>(); |
| |
| private volatile InitializationListener initializationListener; |
| |
| /** |
| * The resource getting advise from this. |
| */ |
| private final DistributionAdvisee advisee; |
| /** |
| * The membership listener registered with the dm. |
| */ |
| private final MembershipListener membershipListener; |
| |
| protected DistributionAdvisor(DistributionAdvisee advisee) { |
| this.advisee = advisee; |
| membershipListener = new MembershipListener() { |
| |
| @Override |
| public void memberJoined(DistributionManager distributionManager, |
| InternalDistributedMember id) { |
| // Ignore |
| } |
| |
| @Override |
| public void quorumLost(DistributionManager distributionManager, |
| Set<InternalDistributedMember> failures, List<InternalDistributedMember> remaining) {} |
| |
| @Override |
| @SuppressWarnings("synthetic-access") |
| public void memberDeparted(DistributionManager distributionManager, |
| final InternalDistributedMember id, boolean crashed) { |
| boolean shouldSync = crashed && shouldSyncForCrashedMember(id); |
| final Profile profile = getProfile(id); |
| boolean removed = |
| removeId(id, crashed, false/* destroyed */, true/* fromMembershipListener */); |
| // if concurrency checks are enabled and this was a crash we may need to |
| // sync with other members in case an update was lost. We do this in the |
| // waiting thread pool so as not to block other membership listeners |
| if (removed && shouldSync) { |
| syncForCrashedMember(id, profile); |
| } |
| } |
| |
| @Override |
| public void memberSuspect(DistributionManager distributionManager, |
| InternalDistributedMember id, InternalDistributedMember whoSuspected, String reason) {} |
| |
| }; |
| } |
| |
| public static DistributionAdvisor createDistributionAdvisor(DistributionAdvisee advisee) { |
| DistributionAdvisor advisor = new DistributionAdvisor(advisee); |
| advisor.initialize(); |
| return advisor; |
| } |
| |
| protected void initialize() { |
| subInit(); |
| getDistributionManager().addMembershipListener(membershipListener); |
| } |
| |
| private void subInit() { |
| // override for any additional initialization specific to subclass |
| } |
| |
| /** |
| * determine whether a delta-gii synchronization should be performed for this lost member |
| * |
| * @return true if a delta-gii should be performed |
| */ |
| public boolean shouldSyncForCrashedMember(InternalDistributedMember id) { |
| return (advisee instanceof DistributedRegion) |
| && ((DistributedRegion) advisee).shouldSyncForCrashedMember(id); |
| } |
| |
| /** perform a delta-GII for the given lost member */ |
| public void syncForCrashedMember(final InternalDistributedMember id, final Profile profile) { |
| final DistributedRegion dr = getRegionForDeltaGII(); |
| if (dr == null) { |
| return; |
| } |
| |
| final boolean isDebugEnabled = logger.isDebugEnabled(); |
| if (isDebugEnabled) { |
| logger.debug("da.syncForCrashedMember will sync region in cache's timer for region: {}", dr); |
| } |
| CacheProfile cacheProfile = (CacheProfile) profile; |
| PersistentMemberID persistentId = getPersistentID(cacheProfile); |
| VersionSource lostVersionID; |
| if (persistentId != null) { |
| lostVersionID = persistentId.getVersionMember(); |
| } else { |
| lostVersionID = id; |
| } |
| // schedule the synchronization for execution in the future based on the client health monitor |
| // interval. This allows client caches to retry an operation that might otherwise be recovered |
| // through the sync operation. Without associated event information this could cause the |
| // retried operation to be mishandled. See GEODE-5505 |
| final long delay = getDelay(dr); |
| |
| if (dr.getDataPolicy().withPersistence() && persistentId == null) { |
| // Fix for GEODE-6886 (#46704). The lost member may be an empty accessor |
| // of a persistent replicate region. We don't need to do a synchronization |
| // in that case, because those members send their writes to a persistent member. |
| // Only a persistent member can generate the version. |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "da.syncForCrashedMember skipping sync because crashed member is not persistent: {}", |
| id); |
| } |
| return; |
| } |
| dr.scheduleSynchronizeForLostMember(id, lostVersionID, delay); |
| if (dr.getConcurrencyChecksEnabled()) { |
| dr.setRegionSynchronizeScheduled(lostVersionID); |
| } |
| } |
| |
| PersistentMemberID getPersistentID(CacheProfile cp) { |
| return cp.persistentID; |
| } |
| |
| long getDelay(DistributedRegion dr) { |
| return dr.getGemFireCache().getCacheServers().stream() |
| .mapToLong(CacheServer::getMaximumTimeBetweenPings).max().orElse(0L); |
| } |
| |
| /** find the region for a delta-gii operation (synch) */ |
| public DistributedRegion getRegionForDeltaGII() { |
| if (advisee instanceof DistributedRegion) { |
| return (DistributedRegion) advisee; |
| } |
| return null; |
| } |
| |
| protected String toStringWithProfiles() { |
| final StringBuilder sb = new StringBuilder(toString()); |
| sb.append(" with profiles=("); |
| Profile[] profs = profiles; // volatile read |
| for (int i = 0; i < profs.length; i++) { |
| if (i > 0) { |
| sb.append(", "); |
| } |
| sb.append(profs[i]); |
| } |
| sb.append(")"); |
| return sb.toString(); |
| } |
| |
| /** |
| * Increment and get next profile version from {@link #profileVersionSequencer}. |
| * |
| * @return next profile version number |
| */ |
| protected int incrementAndGetVersion() { |
| // NOTE: int should rollover if value is Integer.MAX_VALUE |
| return profileVersionSequencer.incrementAndGet(); |
| } |
| |
| /** |
| * Generates a serial number for identifying a logical resource. Later instances of the same |
| * logical resource will have a greater serial number than earlier instances. This number |
| * increments statically throughout the life of this JVM. Rollover to negative is allowed. |
| * |
| * @see #ILLEGAL_SERIAL |
| * @return the new serial number |
| */ |
| public static int createSerialNumber() { |
| for (;;) { |
| // NOTE: AtomicInteger should rollover if value is Integer.MAX_VALUE |
| int result = serialNumberSequencer.incrementAndGet(); |
| if (result != ILLEGAL_SERIAL) { |
| return result; |
| } |
| } |
| } |
| |
| public DistributionManager getDistributionManager() { |
| return getAdvisee().getDistributionManager(); |
| } |
| |
| /** |
| * Like getDistributionManager but does not check |
| * that the DistributedSystem is still connected |
| */ |
| private DistributionManager getDistributionManagerWithNoCheck() { |
| return getAdvisee().getSystem().getDM(); |
| } |
| |
| public DistributionAdvisee getAdvisee() { |
| return advisee; |
| } |
| |
| /** |
| * Free up resources used by this advisor once it is no longer being used. |
| * |
| * @since GemFire 3.5 |
| */ |
| public void close() { |
| try { |
| synchronized (this) { |
| membershipClosed = true; |
| operationMonitor.close(); |
| } |
| getDistributionManager().removeMembershipListener(membershipListener); |
| } catch (CancelException e) { |
| // if distribution has stopped, above is a no-op. |
| } catch (IllegalArgumentException ignore) { |
| // this is thrown if the listener is no longer registered |
| } |
| } |
| |
| |
| /** |
| * Atomically add listener to the list to receive notification when a *new* profile is added or a |
| * profile is removed, and return adviseGeneric(). This ensures that no membership listener calls |
| * are missed, but there is no guarantee that there won't be redundant listener calls. |
| */ |
| public Set<InternalDistributedMember> addMembershipListenerAndAdviseGeneric( |
| MembershipListener listener) { |
| initializationGate(); // exchange profiles before acquiring lock on membershipListeners |
| membershipListeners.putIfAbsent(listener, Boolean.TRUE); |
| return adviseGeneric(); |
| } |
| |
| |
| /** |
| * Add listener to the list to receive notification when a profile is added or removed. Note that |
| * there is no guarantee that the listener will not get redundant calls, but the listener is |
| * guaranteed to get a call. |
| */ |
| public void addMembershipListener(MembershipListener listener) { |
| membershipListeners.putIfAbsent(listener, Boolean.TRUE); |
| } |
| |
| public interface InitializationListener { |
| /** |
| * Called after this DistributionAdvisor has been initialized. |
| */ |
| void initialized(); |
| } |
| |
| public void setInitializationListener(InitializationListener listener) { |
| this.initializationListener = listener; |
| } |
| |
| public boolean addProfileChangeListener(ProfileListener listener) { |
| return null == profileListeners.putIfAbsent(listener, Boolean.TRUE); |
| } |
| |
| public void removeProfileChangeListener(ProfileListener listener) { |
| profileListeners.remove(listener); |
| } |
| |
| /** |
| * Remove listener from the list to receive notification when a provile is added or removed. |
| * |
| * @return true if listener was in the list |
| */ |
| public boolean removeMembershipListener(MembershipListener listener) { |
| return membershipListeners.remove(listener) != null; |
| } |
| |
| /** Called by CreateRegionProcessor after it does its own profile exchange */ |
| public void setInitialized() { |
| synchronized (initializeLock) { |
| initialized = true; |
| } |
| } |
| |
| /** Return true if exchanged profiles */ |
| public boolean initializationGate() { |
| if (initialized) { |
| return false; |
| } |
| boolean exchangedProfiles = false; |
| try { |
| synchronized (initializeLock) { |
| if (!initialized) { |
| exchangedProfiles = true; |
| exchangeProfiles(); |
| return true; |
| } |
| } |
| } finally { |
| if (exchangedProfiles) { |
| if (this.initializationListener != null) { |
| // this needs to be done outside the initializeLock |
| this.initializationListener.initialized(); |
| } |
| } |
| } |
| return false; |
| } |
| |
| // wait for pending profile exchange to complete before returning |
| public boolean isInitialized() { |
| synchronized (initializeLock) { |
| return initialized; |
| } |
| } |
| |
| /** |
| * Polls the isInitialized state. Unlike {@link #isInitialized} it will not wait for it to become |
| * initialized if it is in the middle of being initialized. |
| * |
| * @since GemFire 5.7 |
| */ |
| public boolean pollIsInitialized() { |
| return initialized; |
| } |
| |
| /** |
| * Dumps out all profiles in this advisor. |
| * |
| * @param infoMsg prefix message to log |
| */ |
| |
| public void dumpProfiles(String infoMsg) { |
| Profile[] profs = profiles; |
| final StringBuilder buf = new StringBuilder(2000); |
| if (infoMsg != null) { |
| buf.append(infoMsg); |
| buf.append(": "); |
| } |
| buf.append("FYI, DUMPING PROFILES IN "); |
| buf.append(toString()); |
| buf.append(":\n"); |
| |
| buf.append("My Profile="); |
| buf.append(getAdvisee().getProfile()); |
| buf.append("\nOther Profiles:\n"); |
| |
| for (Profile prof : profs) { |
| buf.append("\t"); |
| buf.append(prof.toString()); |
| buf.append("\n"); |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug(buf.toString()); |
| } |
| } |
| |
| /** |
| * Create or update a profile for a remote counterpart. |
| * |
| * @param profile the profile, referenced by this advisor after this method returns. |
| */ |
| public boolean putProfile(Profile profile) { |
| return putProfile(profile, false); |
| } |
| |
| public synchronized boolean putProfile(Profile newProfile, boolean forceProfile) { |
| try { |
| return doPutProfile(newProfile, forceProfile); |
| } finally { |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "putProfile exiting {}", |
| toStringWithProfiles()); |
| } |
| } |
| } |
| |
| /** |
| * Return true if the memberId on the specified Profile is a current member of the distributed |
| * system. |
| * |
| * @since GemFire 5.7 |
| */ |
| protected boolean isCurrentMember(Profile p) { |
| return getDistributionManager().isCurrentMember(p.getDistributedMember()); |
| } |
| |
| /** |
| * Update the Advisor with profiles describing remote instances of the |
| * {@link DistributionAdvisor#getAdvisee()}. Profile information is versioned via |
| * {@link Profile#getVersion()} and may be ignored if an older version is received after newer |
| * versions. |
| * |
| * @param newProfile the profile to add |
| * @param forceProfile true will force profile to be added even if member is not in distributed |
| * view (should only ever be true for tests that need to inject a bad profile) |
| * |
| * @return true if the profile was applied, false if the profile was ignored |
| */ |
| private synchronized boolean doPutProfile(Profile newProfile, boolean forceProfile) { |
| assert newProfile != null; |
| // prevent putting of profile that is gone from the view |
| if (!forceProfile) { |
| // ensure member is in distributed system view |
| if (!isCurrentMember(newProfile)) { |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "putProfile: ignoring {}; not in current view for {}", |
| newProfile.getDistributedMember(), getAdvisee().getFullPath()); |
| } |
| |
| // member is no longer in system so do nothing |
| return false; |
| } |
| } |
| |
| // prevent putting of profile for which we already received removal msg |
| Integer removedSerialNumber = removedProfiles.get(newProfile.getId()); |
| if (removedSerialNumber != null |
| && !isNewerSerialNumber(newProfile.getSerialNumber(), removedSerialNumber)) { |
| // removedProfile exists and newProfile is NOT newer so do nothing |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "putProfile: Skipping putProfile: {} is not newer than serialNumber {} for {}", |
| newProfile, removedSerialNumber, getAdvisee().getFullPath()); |
| } |
| return false; |
| } |
| |
| // compare newProfile to oldProfile if one is found |
| Profile oldProfile = getProfile(newProfile.getId()); |
| final boolean isTraceEnabled_DistributionAdvisor = |
| logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE); |
| if (isTraceEnabled_DistributionAdvisor) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "putProfile: Updating existing profile: {} with new profile: {} for {}", oldProfile, |
| newProfile, getAdvisee().getFullPath()); |
| } |
| if (oldProfile != null && !isNewerProfile(newProfile, oldProfile)) { |
| // oldProfile exists and newProfile is NOT newer so do nothing |
| if (isTraceEnabled_DistributionAdvisor) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "putProfile: Ignoring {} because it's older than or same as {} for {}", newProfile, |
| oldProfile, getAdvisee().getFullPath()); |
| } |
| return false; |
| } |
| |
| // handle membershipVersion for state flush |
| if (newProfile.initialMembershipVersion == 0) { |
| if (oldProfile != null) { |
| newProfile.initialMembershipVersion = oldProfile.initialMembershipVersion; |
| } else { |
| if (!membershipClosed) { |
| operationMonitor.initNewProfile(newProfile); |
| } |
| } |
| } else { |
| operationMonitor.forceNewMembershipVersion(); |
| } |
| |
| if (isTraceEnabled_DistributionAdvisor) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "DistributionAdvisor ({}) putProfile: {}", this, newProfile); |
| } |
| boolean doAddOrUpdate = evaluateProfiles(newProfile, oldProfile); |
| if (!doAddOrUpdate) { |
| return false; |
| } |
| if (basicAddProfile(newProfile)) { |
| profileCreated(newProfile); |
| notifyListenersProfileAdded(newProfile); |
| notifyListenersMemberAdded(newProfile.getDistributedMember()); |
| } else { |
| notifyListenersProfileUpdated(newProfile); |
| profileUpdated(newProfile); |
| } |
| return true; |
| } |
| |
| /** |
| * A callback to sub-classes for extra validation logic |
| * |
| * @return true if the change from old to new is valid |
| */ |
| protected boolean evaluateProfiles(Profile newProfile, Profile oldProfile) { |
| return true; |
| } |
| |
| /** |
| * Returns true if newProfile is newer than oldProfile. This is determined by comparing |
| * {@link Profile#getSerialNumber()} and {@link Profile#getVersion()}. If the old versioning |
| * number being compared is above {@link #ROLLOVER_THRESHOLD_UPPER} and the new versioning number |
| * is below {@link #ROLLOVER_THRESHOLD_LOWER} then a rollover is assumed to have occurred, which |
| * means the new versioning number is newer. |
| * |
| * @param newProfile the newer profile |
| * @param oldProfile the older profile |
| * @return true if newProfile is newer than oldProfile |
| */ |
| private boolean isNewerProfile(Profile newProfile, Profile oldProfile) { |
| Assert.assertHoldsLock(this, true); |
| boolean isNewer = true; |
| |
| // force version comparison |
| int oldSerial = oldProfile.getSerialNumber(); |
| int newSerial = newProfile.getSerialNumber(); |
| // boolean serialRolled = oldSerial > 0 && newSerial < 0; |
| boolean serialRolled = |
| oldSerial > ROLLOVER_THRESHOLD_UPPER && newSerial < ROLLOVER_THRESHOLD_LOWER; |
| |
| int oldVersion = oldProfile.getVersion(); |
| int newVersion = newProfile.getVersion(); |
| // boolean versionRolled = oldVersion > 0 && newVersion < 0; |
| boolean versionRolled = |
| oldVersion > ROLLOVER_THRESHOLD_UPPER && newVersion < ROLLOVER_THRESHOLD_LOWER; |
| |
| boolean newIsNewer; |
| if (oldSerial == newSerial) { |
| // if region serial is same, compare versions |
| newIsNewer = versionRolled || oldVersion < newVersion; |
| } else { |
| // compare region serial |
| newIsNewer = serialRolled || oldSerial < newSerial; |
| } |
| if (!newIsNewer) { |
| isNewer = false; |
| } |
| return isNewer; |
| } |
| |
| /** |
| * Compare two serial numbers |
| * |
| * @return return true if the first serial number (newSerialNumber) is more recent |
| */ |
| public static boolean isNewerSerialNumber(int newSerialNumber, int oldSerialNumber) { |
| boolean serialRolled = |
| oldSerialNumber > ROLLOVER_THRESHOLD_UPPER && newSerialNumber < ROLLOVER_THRESHOLD_LOWER; |
| return serialRolled || oldSerialNumber < newSerialNumber; |
| } |
| |
| /** |
| * Create a new version of the membership profile set. This is used in flushing state out of the |
| * VM for previous versions of the set. |
| * |
| * @since GemFire 5.1 |
| */ |
| public void forceNewMembershipVersion() { |
| operationMonitor.forceNewMembershipVersion(); |
| } |
| |
| /** |
| * this method must be invoked at the start of every operation that can modify the state of |
| * resource. The return value must be recorded and sent to the advisor in an endOperation message |
| * when messages for the operation have been put in the DistributionManager's outgoing "queue". |
| * |
| * @return the current membership version for this advisor |
| * @since GemFire 5.1 |
| */ |
| public long startOperation() { |
| return operationMonitor.startOperation(); |
| } |
| |
| /** |
| * This method must be invoked when messages for an operation have been put in the |
| * DistributionManager's outgoing queue. |
| * |
| * @param version The membership version returned by startOperation |
| * @since GemFire 5.1 |
| */ |
| public void endOperation(long version) { |
| operationMonitor.endOperation(version); |
| } |
| |
| /** |
| * wait for the current operations being sent on views prior to the joining of the given member to |
| * be placed on communication channels before returning |
| * |
| * @since GemFire 5.1 |
| */ |
| public void waitForCurrentOperations() { |
| operationMonitor.waitForCurrentOperations(); |
| } |
| |
| public void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) { |
| // this may wait longer than it should if the membership version changes, dumping |
| // more operations into the previousVersionOpCount |
| operationMonitor.waitForCurrentOperations(alertLogger, warnMS, severeAlertMS); |
| } |
| |
| /** |
| * Bypass the distribution manager and ask the membership manager directly if a given member is |
| * still in the view. |
| * |
| * We need this because we're asking membership questions from within listeners, and we don't know |
| * whether the DM's membership listener fires before or after our own. |
| * |
| * @param id member we are asking about |
| * @return true if we are still in the JGroups view (must return false if id == null) |
| */ |
| protected boolean stillInView(ProfileId id) { |
| if (id instanceof InternalDistributedMember) { |
| InternalDistributedMember memberId = (InternalDistributedMember) id; |
| return getDistributionManager().getViewMembers().contains(memberId); |
| } else { |
| // if id is not a InternalDistributedMember then return false |
| return false; |
| } |
| } |
| |
| /** |
| * Given member is no longer pertinent to this advisor; remove it. |
| * |
| * This is often overridden in subclasses, but they need to defer to their superclass at some |
| * point in their re-implementation. |
| * |
| * @param memberId the member to remove |
| * @param crashed true if the member did not leave normally |
| * @return true if it was being tracked |
| */ |
| private boolean basicRemoveId(ProfileId memberId, boolean crashed, boolean destroyed) { |
| final boolean isDebugEnabled = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE); |
| if (isDebugEnabled) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "DistributionAdvisor ({}) removeId {}", |
| this, memberId); |
| } |
| |
| Profile profileRemoved = basicRemoveMemberId(memberId); |
| if (profileRemoved == null) { |
| if (isDebugEnabled) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "DistributionAdvisor.removeId: no profile to remove for {}", memberId); |
| } |
| return false; |
| } |
| if (isDebugEnabled) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "DistributionAdvisor.removeId: removed profile for {}", memberId); |
| } |
| profileRemoved(profileRemoved); |
| notifyListenersProfileRemoved(profileRemoved, destroyed); |
| notifyListenersMemberRemoved(profileRemoved.getDistributedMember(), crashed); |
| profileRemoved.cleanUp(); |
| return true; |
| } |
| |
| /** |
| * Removes the specified profile if it is registered with this advisor. |
| * |
| * @since GemFire 5.7 |
| */ |
| private void removeProfile(Profile profile) { |
| removeId(profile.getId(), false, false, false); |
| } |
| |
| /** |
| * Removes the profile for the given member. This method is meant to be overriden by subclasses. |
| * |
| * @param memberId the member whose profile should be removed |
| * @param crashed true if the member crashed |
| * @param fromMembershipListener true if this call is a result of MembershipEvent invocation |
| * (fixes #42000) |
| * @return true when the profile was removed, false otherwise |
| */ |
| public boolean removeId(final ProfileId memberId, boolean crashed, boolean destroyed, |
| boolean fromMembershipListener) { |
| boolean result; |
| try { |
| result = doRemoveId(memberId, crashed, destroyed, fromMembershipListener); |
| } finally { |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "removeId {} exiting {}", memberId, |
| toStringWithProfiles()); |
| } |
| } |
| return result; |
| } |
| |
| private boolean doRemoveId(ProfileId memberId, boolean crashed, boolean destroyed, |
| boolean fromMembershipListener) { |
| final boolean isDebugEnabled_DA = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE); |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "removeId: removing member {} from resource {}", memberId, getAdvisee().getFullPath()); |
| } |
| synchronized (this) { |
| // If the member has disappeared, completely remove |
| if (!fromMembershipListener) { |
| boolean result = false; |
| // Assert.assertTrue(!crashed); // should not get here :-) |
| |
| // Is there an existing profile? If so, add it to list of those removed. |
| Profile profileToRemove = getProfile(memberId); |
| while (profileToRemove != null) { |
| result = true; |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "removeId: tracking removal of {}", |
| profileToRemove); |
| } |
| removedProfiles.put(profileToRemove.getDistributedMember(), |
| profileToRemove.getSerialNumber()); |
| basicRemoveId(profileToRemove.getId(), crashed, destroyed); |
| profileToRemove = getProfile(memberId); |
| } |
| return result; |
| } else { |
| // Garbage collect; this profile is no longer pertinent |
| removedProfiles.remove(memberId); |
| boolean result = basicRemoveId(memberId, crashed, destroyed); |
| while (basicRemoveId(memberId, crashed, destroyed)) { |
| // keep removing profiles that match until we have no more |
| } |
| return result; |
| } |
| } |
| } |
| |
| /** |
| * Removes the profile for the specified member and serial number |
| * |
| * @param memberId the member to remove the profile for |
| * @param serialNum specific serial number to remove |
| * @return true if a matching profile for the member was found |
| */ |
| public boolean removeIdWithSerial(InternalDistributedMember memberId, int serialNum, |
| boolean regionDestroyed) { |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "removeIdWithSerial: removing member {} with serial {} from resource {}", memberId, |
| serialNum, getAdvisee().getName()); |
| } |
| Assert.assertTrue(serialNum != ILLEGAL_SERIAL); |
| return updateRemovedProfiles(memberId, serialNum, regionDestroyed); |
| } |
| |
| /** |
| * Update the list of removed profiles based on given serial number, and ensure that the given |
| * member is no longer in the list of bucket owners. |
| * |
| * @param memberId member to remove |
| * @param serialNum serial number |
| * @return true if this member was an owner |
| */ |
| private synchronized boolean updateRemovedProfiles(InternalDistributedMember memberId, |
| int serialNum, boolean regionDestroyed) { |
| final boolean isDebugEnabled_DA = logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE); |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "updateRemovedProfiles: ensure member {} with serial {} is removed from region {}", |
| memberId, serialNum, getAdvisee().getFullPath()); |
| } |
| |
| boolean removedId = false; |
| if (stillInView(memberId)) { |
| boolean isNews = true; |
| |
| // If existing profile is newer, just return |
| Profile profileToRemove = getProfile(memberId); |
| if (profileToRemove != null) { |
| if (isNewerSerialNumber(profileToRemove.serialNumber, serialNum)) { |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "updateRemovedProfiles: member {} has profile {} which is newer than serial {}", |
| memberId, profileToRemove, serialNum); |
| } |
| // We have a current profile for this member, but its serial number |
| // is more recent than the removal that was requested. |
| |
| // Do not remove our existing profile, and do not update |
| // removedProfiles. |
| isNews = false; |
| } |
| } |
| |
| if (isNews) { |
| // Is this a more recent removal than we have recorded? |
| // If not, do not remove any existing profile, and do not |
| // update removedProfiles |
| Integer oldSerial = removedProfiles.get(memberId); |
| if (oldSerial != null && isNewerSerialNumber(oldSerial, serialNum)) { |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "updateRemovedProfiles: member {} sent removal of serial {} but we hae already removed {}", |
| memberId, serialNum, oldSerial); |
| } |
| isNews = false; |
| } |
| } |
| |
| if (isNews) { |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "updateRemovedProfiles: adding serial {} for member {} to removedProfiles", serialNum, |
| memberId); |
| } |
| // The member is still in the system, and the removal message is |
| // a new one. Remember this removal, and ensure that its profile |
| // is removed from this bucket. |
| removedProfiles.put(memberId, serialNum); |
| |
| // Only remove profile if this removal is more recent than our |
| // current state |
| removedId = basicRemoveId(memberId, false, regionDestroyed); |
| } |
| } // isCurrentMember |
| else { |
| // If the member has disappeared, completely remove (garbage collect) |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "updateRemovedProfiles: garbage collecting member {}", memberId); |
| } |
| removedProfiles.remove(memberId); |
| |
| // Always make sure that this member is removed from the advisor |
| removedId = basicRemoveId(memberId, false, regionDestroyed); |
| } |
| |
| if (isDebugEnabled_DA) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "updateRemovedProfiles: removedId = {}", |
| removedId); |
| } |
| |
| return removedId; |
| } |
| |
| /** |
| * Indicate whether given member is being tracked |
| * |
| * @param memberId the member |
| * @return true if the member was being tracked |
| */ |
| public synchronized boolean containsId(InternalDistributedMember memberId) { |
| return (indexOfMemberId(memberId) > -1); |
| } |
| |
| |
| public synchronized int getNumProfiles() { |
| return numActiveProfiles; |
| } |
| |
| /** |
| * Caller must be synchronized on this. Overridden in BucketAdvisor. |
| */ |
| private void setNumActiveProfiles(int newValue) { |
| numActiveProfiles = newValue; |
| } |
| |
| public Profile getProfile(ProfileId id) { |
| Profile[] allProfiles = profiles; // volatile read |
| boolean isIDM = (id instanceof InternalDistributedMember); |
| for (Profile allProfile : allProfiles) { |
| if (isIDM) { |
| if (allProfile.getDistributedMember().equals(id)) { |
| return allProfile; |
| } |
| } else { |
| if (allProfile.getId().equals(id)) { |
| return allProfile; |
| } |
| } |
| } |
| return null; |
| } |
| |
| /** exchange profiles to initialize this advisor */ |
| public void exchangeProfiles() { |
| Assert.assertHoldsLock(this, false); // causes deadlock |
| Assert.assertHoldsLock(initializeLock, true); |
| new UpdateAttributesProcessor(getAdvisee()).distribute(true); |
| setInitialized(); |
| } |
| |
| /** Creates the current distribution profile for this member */ |
| public Profile createProfile() { |
| Profile newProfile = |
| instantiateProfile(getDistributionManager().getId(), incrementAndGetVersion()); |
| getAdvisee().fillInProfile(newProfile); |
| return newProfile; |
| } |
| |
| /** Instantiate new distribution profile for this member */ |
| protected Profile instantiateProfile(InternalDistributedMember memberId, int version) { |
| return new Profile(memberId, version); |
| } |
| |
| /** |
| * Provide recipient information for any other operation. Returns the set of members that have |
| * remote counterparts. |
| * |
| * @return Set of Serializable members; no reference to Set kept by advisor so caller is free to |
| * modify it |
| */ |
| public Set<InternalDistributedMember> adviseGeneric() { |
| return adviseFilter(null); |
| } |
| |
| /** |
| * Provide recipients for profile exchange, called by UpdateAttributesProcessor and |
| * CreateRegionProcessor. Can not be initialized at this point because it is only called in the |
| * following scenarios: 1) We're doing a lazy initialization and synchronization on initializeLock |
| * prevents other threads from causing initialization on this advisor. 2) We're creating a new |
| * region and doing profile exchange as part of region initialization, in which case no other |
| * threads have access to the region or this advisor. |
| */ |
| public Set adviseProfileExchange() { |
| // Get the list of recipients from the nearest initialized advisor |
| // in the parent chain |
| Assert.assertTrue(!isInitialized()); |
| DistributionAdvisor advisor; |
| DistributionAdvisee advisee = getAdvisee(); |
| do { |
| advisee = advisee.getParentAdvisee(); |
| if (advisee == null) |
| return getDefaultDistributionMembers(); |
| advisor = advisee.getDistributionAdvisor(); |
| } while (!advisor.isInitialized()); |
| // do not call adviseGeneric because we don't want to trigger another |
| // profile exchange on the parent |
| return advisor.adviseFilter(null); |
| } |
| |
| /** |
| * Returns a set of the members this advisor should distribute to by default |
| * |
| * @since GemFire 5.7 |
| */ |
| private Set<InternalDistributedMember> getDefaultDistributionMembers() { |
| if (!useAdminMembersForDefault()) { |
| return getDistributionManager().getOtherDistributionManagerIds(); |
| } else { |
| return getDistributionManager().getAllOtherMembers(); |
| } |
| } |
| |
| /** |
| * Returns true if all members including ADMIN are required for distribution of update attributes |
| * message by {@link #getDefaultDistributionMembers()}. |
| */ |
| public boolean useAdminMembersForDefault() { |
| return false; |
| } |
| |
| private void notifyListenersMemberAdded(InternalDistributedMember member) { |
| for (MembershipListener membershipListener : membershipListeners.keySet()) { |
| try { |
| membershipListener.memberJoined(getDistributionManagerWithNoCheck(), member); |
| } catch (Exception e) { |
| logger.warn("Ignoring exception during member joined listener notification", e); |
| } |
| } |
| } |
| |
| private void notifyListenersMemberRemoved(InternalDistributedMember member, boolean crashed) { |
| for (MembershipListener membershipListener : membershipListeners.keySet()) { |
| try { |
| membershipListener.memberDeparted(getDistributionManagerWithNoCheck(), member, |
| crashed); |
| } catch (Exception e) { |
| logger.warn("Ignoring exception during member departed listener notification", e); |
| } |
| } |
| } |
| |
| private void notifyListenersProfileRemoved(Profile profile, boolean destroyed) { |
| for (ProfileListener profileListener : profileListeners.keySet()) { |
| (profileListener).profileRemoved(profile, destroyed); |
| } |
| } |
| |
| private void notifyListenersProfileAdded(Profile profile) { |
| for (ProfileListener profileListener : profileListeners.keySet()) { |
| (profileListener).profileCreated(profile); |
| } |
| } |
| |
| private void notifyListenersProfileUpdated(Profile profile) { |
| for (ProfileListener profileListener : profileListeners.keySet()) { |
| (profileListener).profileUpdated(profile); |
| } |
| } |
| |
| /** |
| * Template method for sub-classes to override. Method is invoked after a new profile is |
| * created/added to profiles. |
| * |
| * @param profile the created profile |
| */ |
| protected void profileCreated(Profile profile) {} |
| |
| /** |
| * Template method for sub-classes to override. Method is invoked after a profile is updated in |
| * profiles. |
| * |
| * @param profile the updated profile |
| */ |
| protected void profileUpdated(Profile profile) {} |
| |
| /** |
| * Template method for sub-classes to override. Method is invoked after a profile is removed from |
| * profiles. |
| * |
| * @param profile the removed profile |
| */ |
| protected void profileRemoved(Profile profile) {} |
| |
| /** All advise methods go through this method */ |
| protected Set<InternalDistributedMember> adviseFilter(Filter f) { |
| initializationGate(); |
| Set<InternalDistributedMember> recipients = null; |
| Profile[] locProfiles = profiles; // grab current profiles |
| for (Profile profile : locProfiles) { |
| if (f == null || f.include(profile)) { |
| if (recipients == null) { |
| recipients = new HashSet<>(); |
| } |
| recipients.add(profile.getDistributedMember()); |
| } |
| } |
| if (recipients == null) { |
| return Collections.emptySet(); |
| } else { |
| return recipients; |
| } |
| } |
| |
| /** |
| * This method calls filter->include on every profile until include returns true. |
| * |
| * @return false if all filter->include calls returns false; otherwise true. |
| **/ |
| protected boolean satisfiesFilter(Filter f) { |
| initializationGate(); |
| Profile[] locProfiles = profiles; // grab current profiles |
| for (Profile p : locProfiles) { |
| if (f.include(p)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| /** |
| * A visitor interface for all the available profiles used by |
| * {@link DistributionAdvisor#accept(ProfileVisitor, Object)}. Unlike the {@link Filter} class |
| * this does not assume of two state visit of inclusion or exclusion rather allows manipulation of |
| * an arbitrary aggregator that has been passed to the {@link #visit} method. In addition this is |
| * public for use by other classes. |
| */ |
| public interface ProfileVisitor<T> { |
| |
| /** |
| * Visit a given {@link Profile} accumulating the results in the given aggregate. Returns false |
| * when the visit has to be terminated. |
| * |
| * @param advisor the DistributionAdvisor that invoked this visitor |
| * @param profile the profile being visited |
| * @param profileIndex the index of current profile |
| * @param numProfiles the total number of profiles being visited |
| * @param aggregate result aggregated so far, if any |
| * |
| * @return false if the visit has to be terminated immediately and false otherwise |
| */ |
| boolean visit(DistributionAdvisor advisor, Profile profile, int profileIndex, int numProfiles, |
| T aggregate); |
| } |
| |
| /** |
| * Invoke the given {@link ProfileVisitor} on all the {@link Profile}s exiting when the |
| * {@link ProfileVisitor#visit} method returns false. Unlike the {@link #adviseFilter(Filter)} |
| * method this does assume the return type to be a Set of qualifying members rather allows for |
| * population of an arbitrary aggregator passed as the argument to this method. |
| * |
| * @param <T> the type of object used for aggregation of results |
| * @param visitor the {@link ProfileVisitor} to use for the visit |
| * @param aggregate an aggregate object that will be used to for aggregation of results by the |
| * {@link ProfileVisitor#visit} method; this allows the {@link ProfileVisitor} to not |
| * maintain any state so that in many situations a global static object encapsulating the |
| * required behaviour will work |
| * |
| * @return true if all the profiles were visited and false if the {@link ProfileVisitor#visit} cut |
| * it short by returning false |
| */ |
| public <T> boolean accept(ProfileVisitor<T> visitor, T aggregate) { |
| initializationGate(); |
| final Profile[] locProfiles = profiles; // grab current profiles |
| final int numProfiles = locProfiles.length; |
| Profile p; |
| for (int index = 0; index < numProfiles; ++index) { |
| p = locProfiles[index]; |
| if (!visitor.visit(this, p, index, numProfiles, aggregate)) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * Get an unmodifiable list of the <code>Profile</code>s that match the given <code>Filter</code>. |
| * |
| * @since GemFire 5.7 |
| */ |
| protected List<Profile> fetchProfiles(Filter f) { |
| initializationGate(); |
| List<Profile> result = null; |
| Profile[] locProfiles = profiles; |
| for (Profile profile : locProfiles) { |
| if (f == null || f.include(profile)) { |
| if (result == null) { |
| result = new ArrayList<>(locProfiles.length); |
| } |
| result.add(profile); |
| } |
| } |
| |
| if (result == null) { |
| result = Collections.emptyList(); |
| } else { |
| result = Collections.unmodifiableList(result); |
| } |
| |
| return result; |
| } |
| |
| /** Provide recipients for profile update. */ |
| public Set adviseProfileUpdate() { |
| return adviseGeneric(); |
| } |
| |
| /** |
| * Provide recipients for profile remove. |
| * |
| * @since GemFire 5.7 |
| */ |
| public Set adviseProfileRemove() { |
| return adviseGeneric(); |
| } |
| |
| /** |
| * @return true if new profile added, false if already had profile (but profile is still replaced |
| * with new one) |
| */ |
| // must synchronize when modifying profile array |
| private synchronized boolean basicAddProfile(Profile p) { |
| // don't add more than once, but replace existing profile |
| profilesVersion++; |
| int index = indexOfMemberId(p.getId()); |
| if (index >= 0) { |
| Profile[] oldProfiles = profiles; // volatile read |
| oldProfiles[index] = p; |
| profiles = oldProfiles; // volatile write |
| return false; |
| } |
| |
| // minimize volatile reads by copying ref to local var |
| Profile[] snap = profiles; // volatile read |
| Profile[] newProfiles = (Profile[]) ArrayUtils.insert(snap, snap.length, p); |
| Objects.requireNonNull(newProfiles); |
| |
| profiles = newProfiles; // volatile write |
| setNumActiveProfiles(newProfiles.length); |
| |
| return true; |
| } |
| |
| |
| // must synchronize when modifying profile array |
| /** |
| * Perform work of removing the given member from this advisor. |
| */ |
| private synchronized Profile basicRemoveMemberId(ProfileId id) { |
| |
| int i = indexOfMemberId(id); |
| if (i >= 0) { |
| profilesVersion++; |
| Profile profileRemoved = profiles[i]; |
| basicRemoveIndex(i); |
| return profileRemoved; |
| } else |
| return null; |
| |
| } |
| |
| private int indexOfMemberId(ProfileId id) { |
| Assert.assertHoldsLock(this, true); |
| Profile[] profs = profiles; // volatile read |
| for (int i = 0; i < profs.length; i++) { |
| Profile p = profs[i]; |
| if (id instanceof InternalDistributedMember) { |
| if (p.getDistributedMember().equals(id)) |
| return i; |
| } else { |
| if (p.getId().equals(id)) |
| return i; |
| } |
| } |
| return -1; |
| } |
| |
| private void basicRemoveIndex(int index) { |
| Assert.assertHoldsLock(this, true); |
| // minimize volatile reads by copying ref to local var |
| Profile[] oldProfiles = profiles; // volatile read |
| Profile[] newProfiles = new Profile[oldProfiles.length - 1]; |
| System.arraycopy(oldProfiles, 0, newProfiles, 0, index); |
| System.arraycopy(oldProfiles, index + 1, newProfiles, index, newProfiles.length - index); |
| profiles = newProfiles; // volatile write |
| if (numActiveProfiles > 0) { |
| numActiveProfiles--; |
| } |
| } |
| |
| |
| /** Filter interface */ |
| protected interface Filter { |
| boolean include(Profile profile); |
| } |
| |
| /** |
| * Marker interface to designate on object that serves and the unique id that identifies a |
| * Profile. |
| */ |
| public interface ProfileId { |
| } |
| /** |
| * Profile information for a remote counterpart. |
| */ |
| public static class Profile implements DataSerializableFixedID { |
| /** Member for whom this profile represents */ |
| public InternalDistributedMember peerMemberId; |
| /** Serial number incremented every time profile is updated by memberId */ |
| public int version; |
| public int serialNumber = ILLEGAL_SERIAL; |
| /** |
| * The DistributionAdvisor's membership version where this member was added |
| * |
| * @since GemFire 5.1 |
| */ |
| public transient long initialMembershipVersion; |
| |
| /** for internal use, required for DataSerializable.Helper.readObject */ |
| public Profile() {} |
| |
| public Profile(InternalDistributedMember memberId, int version) { |
| if (memberId == null) { |
| throw new IllegalArgumentException( |
| "memberId cannot be null"); |
| } |
| peerMemberId = memberId; |
| this.version = version; |
| } |
| |
| /** |
| * Return object that uniquely identifies this profile. |
| * |
| * @since GemFire 5.7 |
| */ |
| public ProfileId getId() { |
| return peerMemberId; |
| } |
| |
| public int getVersion() { |
| return version; |
| } |
| |
| public int getSerialNumber() { |
| return serialNumber; |
| } |
| |
| @Override |
| public int hashCode() { |
| return getId().hashCode(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) |
| return true; |
| if (obj == null) |
| return false; |
| if (!getClass().equals(obj.getClass())) |
| return false; |
| return getId().equals(((Profile) obj).getId()); |
| } |
| |
| /** |
| * Return the DistributedMember associated with this profile |
| * |
| * @since GemFire 5.0 |
| */ |
| public InternalDistributedMember getDistributedMember() { |
| return peerMemberId; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return DA_PROFILE; |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| InternalDataSerializer.invokeToData(peerMemberId, out); |
| out.writeInt(version); |
| out.writeInt(serialNumber); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| peerMemberId = new InternalDistributedMember(); |
| InternalDataSerializer.invokeFromData(peerMemberId, in); |
| version = in.readInt(); |
| serialNumber = in.readInt(); |
| } |
| |
| /** |
| * Process add/remove/update of an incoming profile. |
| */ |
| public void processIncoming(ClusterDistributionManager dm, String adviseePath, |
| boolean removeProfile, boolean exchangeProfiles, final List<Profile> replyProfiles) { |
| // nothing by default; just log that nothing was done |
| if (logger.isDebugEnabled()) { |
| logger.debug("While processing UpdateAttributes message ignored incoming profile: {}", |
| this); |
| } |
| } |
| |
| /** |
| * Attempts to process this message with the specified {@link DistributionAdvisee}. Also if |
| * exchange profiles then add the profile from {@link DistributionAdvisee} to reply. |
| * |
| * @param advisee the CacheDistributionAdvisee to apply this profile to |
| * @param removeProfile true to remove profile else add profile |
| * @param exchangeProfiles true to add the profile to reply |
| */ |
| protected void handleDistributionAdvisee(DistributionAdvisee advisee, boolean removeProfile, |
| boolean exchangeProfiles, final List<Profile> replyProfiles) { |
| final DistributionAdvisor da; |
| if (advisee != null && (da = advisee.getDistributionAdvisor()) != null) { |
| if (removeProfile) { |
| da.removeProfile(this); |
| } else { |
| da.putProfile(this); |
| } |
| if (exchangeProfiles) { |
| // assume non-null replyProfiles when exchangeProfiles is true |
| replyProfiles.add(advisee.getProfile()); |
| } |
| } |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder sb = getToStringHeader(); |
| sb.append("@").append(System.identityHashCode(this)).append("("); |
| fillInToString(sb); |
| sb.append(")"); |
| return sb.toString(); |
| } |
| |
| /** |
| * This will be get called when profile will be removed from advisor Do local cleanup in this |
| * thread, otherwsie spawn another thread to do cleanup |
| */ |
| public void cleanUp() { |
| |
| } |
| |
| public StringBuilder getToStringHeader() { |
| return new StringBuilder("Profile"); |
| } |
| |
| public void fillInToString(StringBuilder sb) { |
| sb.append("memberId=").append(peerMemberId); |
| sb.append("; version=").append(version); |
| sb.append("; serialNumber=").append(serialNumber); |
| sb.append("; initialMembershipVersion=").append(initialMembershipVersion); |
| } |
| |
| @Override |
| public Version[] getSerializationVersions() { |
| return null; |
| } |
| } |
| |
| |
| |
| private static class OperationMonitor { |
| private final DistributionAdvisor distributionAdvisor; |
| |
| /** |
| * the version of the profile set |
| */ |
| private long membershipVersion; |
| |
| /** |
| * the number of operations in-progress for previous versions of the profile set |
| */ |
| private long previousVersionOpCount; |
| /** |
| * the number of operations in-progress for the current version of the profile set |
| */ |
| private long currentVersionOpCount; |
| |
| /** |
| * for debugging stalled state-flush operations we track threads performing operations |
| * and capture the state when startOperatiopn is invoked |
| */ |
| private boolean closed; |
| |
| private OperationMonitor(DistributionAdvisor distributionAdvisor) { |
| this.distributionAdvisor = distributionAdvisor; |
| } |
| |
| private synchronized void incrementMembershipVersion() { |
| membershipVersion++; |
| } |
| |
| /** |
| * Create a new version of the membership profile set. This is used in flushing state out of the |
| * VM for previous versions of the set. |
| * |
| * @since GemFire 5.1 |
| */ |
| synchronized void forceNewMembershipVersion() { |
| if (!closed) { |
| incrementMembershipVersion(); |
| previousVersionOpCount += currentVersionOpCount; |
| currentVersionOpCount = 0; |
| membershipVersionChanged(); |
| } |
| } |
| |
| /** |
| * this method must be invoked at the start of every operation that can modify the state of |
| * resource. The return value must be recorded and sent to the advisor in an endOperation |
| * message when messages for the operation have been put in the DistributionManager's outgoing |
| * "queue". |
| * |
| * @return the current membership version for this advisor |
| * @since GemFire 5.1 |
| */ |
| synchronized long startOperation() { |
| logNewOperation(); |
| currentVersionOpCount++; |
| return membershipVersion; |
| } |
| |
| /** |
| * This method must be invoked when messages for an operation have been put in the |
| * DistributionManager's outgoing queue. |
| * |
| * @param version The membership version returned by startOperation |
| * @since GemFire 5.1 |
| */ |
| synchronized void endOperation(long version) { |
| if (version == membershipVersion) { |
| currentVersionOpCount--; |
| logEndOperation(true); |
| } else { |
| previousVersionOpCount--; |
| logEndOperation(false); |
| } |
| } |
| |
| /** |
| * wait for the current operations being sent on views prior to the joining of the given member |
| * to be placed on communication channels before returning |
| * |
| * @since GemFire 5.1 |
| */ |
| void waitForCurrentOperations() { |
| long timeout = |
| 1000L * distributionAdvisor.getDistributionManager().getSystem().getConfig() |
| .getAckWaitThreshold(); |
| waitForCurrentOperations(logger, timeout, timeout * 2L); |
| } |
| |
| void waitForCurrentOperations(Logger alertLogger, long warnMS, long severeAlertMS) { |
| // this may wait longer than it should if the membership version changes, dumping |
| // more operations into the previousVersionOpCount |
| final long startTime = System.currentTimeMillis(); |
| final long warnTime = startTime + warnMS; |
| final long severeAlertTime = startTime + severeAlertMS; |
| boolean warned = false; |
| boolean severeAlertIssued = false; |
| while (operationsAreInProgress()) { |
| // The advisor's close() method will set the pVOC to zero. This loop |
| // must not terminate due to cache closure until that happens. |
| try { |
| Thread.sleep(50); |
| } catch (InterruptedException e) { |
| throw new GemFireIOException("State flush interrupted"); |
| } |
| long now = System.currentTimeMillis(); |
| if ((!warned) && System.currentTimeMillis() >= warnTime) { |
| warned = true; |
| logWaitOnOperationsWarning(alertLogger, warnMS); |
| } else if (warned && !severeAlertIssued && (now >= severeAlertTime)) { |
| logWaitOnOperationsSevere(alertLogger, severeAlertMS); |
| severeAlertIssued = true; |
| } |
| } |
| if (warned) { |
| alertLogger.info("Wait for current operations completed"); |
| } |
| } |
| |
| synchronized boolean operationsAreInProgress() { |
| return previousVersionOpCount > 0; |
| } |
| |
| synchronized void initNewProfile(Profile newProfile) { |
| membershipVersion++; |
| newProfile.initialMembershipVersion = membershipVersion; |
| previousVersionOpCount = |
| previousVersionOpCount + currentVersionOpCount; |
| currentVersionOpCount = 0; |
| membershipVersionChanged(); |
| } |
| |
| synchronized void close() { |
| previousVersionOpCount = 0; |
| currentVersionOpCount = 0; |
| closed = true; |
| } |
| |
| void logNewOperation() {} |
| |
| void logEndOperation(boolean newOperation) {} |
| |
| void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) { |
| // OSProcess.printStacks(0); |
| alertLogger.fatal("This thread has been stalled for {} milliseconds " |
| + "waiting for current operations to complete. Something may be blocking operations.", |
| severeAlertMS); |
| } |
| |
| void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) { |
| alertLogger.warn("This thread has been stalled for {} milliseconds waiting for " |
| + "current operations to complete.", warnMS); |
| } |
| |
| void membershipVersionChanged() {} |
| |
| } |
| |
| private static class ThreadTrackingOperationMonitor extends OperationMonitor { |
| |
| /** |
| * for debugging stalled state-flush operations we track threads performing operations |
| * and capture the state when startOperatiopn is invoked |
| */ |
| private final Map<Thread, ExceptionWrapper> currentVersionOperationThreads; |
| private final Map<Thread, ExceptionWrapper> previousVersionOperationThreads; |
| |
| private ThreadTrackingOperationMonitor( |
| DistributionAdvisor distributionAdvisor) { |
| super(distributionAdvisor); |
| currentVersionOperationThreads = new HashMap<>(); |
| previousVersionOperationThreads = new HashMap<>(); |
| } |
| |
| @Override |
| void logNewOperation() { |
| currentVersionOperationThreads.put(Thread.currentThread(), |
| new ExceptionWrapper(new Exception("stack trace"))); |
| } |
| |
| @Override |
| void logEndOperation(boolean newOp) { |
| if (newOp) { |
| currentVersionOperationThreads.remove(Thread.currentThread()); |
| } else { |
| previousVersionOperationThreads.remove(Thread.currentThread()); |
| } |
| } |
| |
| @Override |
| void logWaitOnOperationsWarning(Logger alertLogger, long warnMS) { |
| super.logWaitOnOperationsWarning(alertLogger, warnMS); |
| synchronized (this) { |
| logger |
| .debug("Waiting for these threads: {}", previousVersionOperationThreads); |
| logger |
| .debug("New version threads are {}", currentVersionOperationThreads); |
| } |
| } |
| |
| @Override |
| void logWaitOnOperationsSevere(Logger alertLogger, long severeAlertMS) { |
| super.logWaitOnOperationsSevere(alertLogger, severeAlertMS); |
| synchronized (this) { |
| logger |
| .debug("Waiting for these threads: {}", previousVersionOperationThreads); |
| logger |
| .debug("New version threads are {}", currentVersionOperationThreads); |
| } |
| } |
| |
| @Override |
| void membershipVersionChanged() { |
| super.membershipVersionChanged(); |
| previousVersionOperationThreads |
| .putAll(currentVersionOperationThreads); |
| currentVersionOperationThreads.clear(); |
| } |
| |
| |
| /** |
| * ExceptionWrapper is used in debugging hangs in waitForCurrentOperations(). It |
| * captures the call stack of a thread invoking startOperation(). |
| */ |
| private static class ExceptionWrapper { |
| private Exception exception; |
| |
| ExceptionWrapper(Exception exception) { |
| this.exception = exception; |
| } |
| |
| @Override |
| public String toString() { |
| StringBuilder builder = new StringBuilder(500); |
| OutputStream os = new OutputStream() { |
| @Override |
| public void write(int i) { |
| builder.append((char) i); |
| } |
| }; |
| PrintStream stream = new PrintStream(os); |
| exception.printStackTrace(stream); |
| return builder.toString(); |
| } |
| } |
| |
| } |
| |
| } |