| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| |
| package org.apache.geode.internal.cache; |
| |
| import static org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.IOException; |
| import java.io.ObjectInputStream; |
| import java.io.Serializable; |
| import java.util.AbstractSet; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Queue; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelException; |
| import org.apache.geode.DataSerializer; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.client.internal.locator.SerializationHelper; |
| import org.apache.geode.cache.partition.PartitionListener; |
| import org.apache.geode.cache.server.CacheServer; |
| import org.apache.geode.distributed.DistributedLockService; |
| import org.apache.geode.distributed.DistributedSystemDisconnectedException; |
| import org.apache.geode.distributed.LockNotHeldException; |
| import org.apache.geode.distributed.LockServiceDestroyedException; |
| import org.apache.geode.distributed.internal.DistributionConfig; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.ReplyProcessor21; |
| import org.apache.geode.distributed.internal.locks.DLockService; |
| import org.apache.geode.distributed.internal.locks.DistributedMemberLock; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.cache.partitioned.Bucket; |
| import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage; |
| import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage; |
| import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage.DeposePrimaryBucketResponse; |
| import org.apache.geode.internal.cache.partitioned.RegionAdvisor; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.serialization.DeserializationContext; |
| import org.apache.geode.internal.serialization.SerializationContext; |
| import org.apache.geode.internal.util.StopWatch; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| import org.apache.geode.util.internal.GeodeGlossary; |
| |
| /** |
| * Specialized {@link CacheDistributionAdvisor} for {@link BucketRegion BucketRegions}. The |
| * <code>BucketAdvisor</code> is owned by a {@link ProxyBucketRegion} and may outlive a |
| * <code>BucketRegion</code>. |
| */ |
| @SuppressWarnings("synthetic-access") |
| public class BucketAdvisor extends CacheDistributionAdvisor { |
| private static final Logger logger = LogService.getLogger(); |
| |
| private static final boolean ENFORCE_SAFE_CLOSE = false; |
| // TODO: Boolean.getBoolean("gemfire.BucketAdvisor.debug.enforceSafeClose"); |
| |
| /** Reference to the InternalDistributedMember that is primary. */ |
| private final AtomicReference<InternalDistributedMember> primaryMember = new AtomicReference<>(); |
| |
| /** |
| * Advice requests for {@link #adviseProfileUpdate()} delegate to the partitioned region's |
| * <code>RegionAdvisor</code> to include members with {@link ProxyBucketRegion}s as well as real |
| * {@link BucketRegion}s. |
| */ |
| private final RegionAdvisor regionAdvisor; |
| |
| private final BucketRedundancyTracker redundancyTracker; |
| |
| /** |
| * The bucket primary will be holding this distributed lock. Protected by synchronized(this). |
| */ |
| private DistributedMemberLock primaryLock; |
| |
| // private static final byte MASK_HOSTING = 1; // 0001 |
| // private static final byte MASK_VOLUNTEERING = 2; // 0010 |
| // private static final byte MASK_OTHER_PRIMARY = 4; // 0100 |
| // private static final byte MASK_IS_PRIMARY = 8; // 1000 |
| |
| private static final byte NO_PRIMARY_NOT_HOSTING = 0; // 0000_0000 |
| private static final byte NO_PRIMARY_HOSTING = 1; // 0000_0001 |
| private static final byte OTHER_PRIMARY_NOT_HOSTING = 4; // 0000_0100 |
| private static final byte OTHER_PRIMARY_HOSTING = 5; // 0000_0101 |
| private static final byte VOLUNTEERING_HOSTING = 3; // 0000_0011 |
| private static final byte BECOMING_HOSTING = 15; // 0000_1111 |
| private static final byte IS_PRIMARY_HOSTING = 9; // 0000_1001 |
| private static final byte CLOSED = 16; // 0001_0000 |
| |
| /** |
| * The current state of this BucketAdvisor which tracks which member is primary and whether or not |
| * this member is hosting a real Bucket. |
| */ |
| private volatile byte primaryState = NO_PRIMARY_NOT_HOSTING; |
| |
| /** |
| * This delegate handles all volunteering for primary status. Lazily created. Protected by |
| * synchronization(this). |
| */ |
| private VolunteeringDelegate volunteeringDelegate; |
| |
| /** |
| * A random number generator |
| * |
| * @see #getPreferredNode() |
| */ |
| @Immutable |
| private static final Random myRand = new Random(); |
| |
| /** |
| * A read/write lock to prevent making this bucket not primary while a write is in progress on the |
| * bucket. |
| */ |
| private final ReadWriteLock primaryMoveReadWriteLock = new ReentrantReadWriteLock(); |
| private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock(); |
| private final Lock primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock(); |
| |
| /** |
| * The advisor for the bucket region that we are colocated with, if this region is a colocated |
| * region. |
| */ |
| private BucketAdvisor parentAdvisor; |
| |
| /** |
| * The member that is responsible for choosing the primary for this bucket. While this field is |
| * set and this member exists, this bucket won't try to become primary. |
| */ |
| private volatile InternalDistributedMember primaryElector; |
| |
| private volatile BucketProfile localProfile; |
| |
| private volatile boolean everHadPrimary = false; |
| |
| private BucketAdvisor startingBucketAdvisor; |
| |
| private PartitionedRegion pRegion; |
| |
| final ConcurrentMap<String, Boolean> destroyedShadowBuckets = new ConcurrentHashMap<>(); |
| |
| /** |
| * Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor. |
| * |
| * @param bucket the bucket to provide metadata and advice for |
| * @param regionAdvisor advisor for the PartitionedRegion |
| */ |
| private BucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) { |
| super(bucket); |
| this.regionAdvisor = regionAdvisor; |
| pRegion = this.regionAdvisor.getPartitionedRegion(); |
| redundancyTracker = |
| new BucketRedundancyTracker(pRegion.getRedundantCopies(), pRegion.getRedundancyTracker()); |
| resetParentAdvisor(bucket.getId()); |
| } |
| |
| public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) { |
| BucketAdvisor advisor = new BucketAdvisor(bucket, regionAdvisor); |
| advisor.initialize(); |
| return advisor; |
| } |
| |
| private void resetParentAdvisor(int bucketId) { |
| PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(pRegion); |
| if (colocatedRegion != null) { |
| if (colocatedRegion.isFixedPartitionedRegion()) { |
| List<FixedPartitionAttributesImpl> fpas = colocatedRegion.getFixedPartitionAttributesImpl(); |
| if (fpas != null) { |
| for (FixedPartitionAttributesImpl fpa : fpas) { |
| if (fpa.hasBucket(bucketId)) { |
| parentAdvisor = |
| colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID()); |
| break; |
| } |
| } |
| } |
| } else { |
| parentAdvisor = colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId); |
| } |
| } else { |
| parentAdvisor = null; |
| } |
| } |
| |
| private void assignStartingBucketAdvisorIfFixedPartitioned() { |
| if (startingBucketAdvisor != null) { |
| // already assigned |
| return; |
| } |
| if (pRegion.isFixedPartitionedRegion()) { |
| List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl(); |
| if (fpas != null) { |
| int bucketId = getBucket().getId(); |
| for (FixedPartitionAttributesImpl fpa : fpas) { |
| if (fpa.hasBucket(bucketId) && bucketId != fpa.getStartingBucketID()) { |
| startingBucketAdvisor = regionAdvisor.getBucketAdvisor(fpa.getStartingBucketID()); |
| break; |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the lock that prevents the primary from moving while active writes are in progress. |
| * This should be locked before checking if the local bucket is primary. |
| * |
| * @return the lock for in-progress write operations |
| */ |
| public Lock getPrimaryMoveReadLock() { |
| return primaryMoveReadLock; |
| } |
| |
| /** |
| * Returns the lock that prevents the parent's primary from moving while active writes are in |
| * progress. This should be locked before checking if the local bucket is primary. |
| * |
| * @return the lock for in-progress write operations |
| */ |
| Lock getParentPrimaryMoveReadLock() { |
| if (parentAdvisor != null) { |
| return parentAdvisor.getPrimaryMoveReadLock(); |
| } |
| return null; |
| } |
| |
| /** |
| * Try to lock the primary bucket to make sure no operation is on-going at current bucket. |
| * |
| */ |
| void tryLockIfPrimary() { |
| if (isPrimary()) { |
| try { |
| primaryMoveWriteLock.lock(); |
| } finally { |
| primaryMoveWriteLock.unlock(); |
| } |
| } |
| } |
| |
| /** |
| * Makes this <code>BucketAdvisor</code> give up being a primary and become a secondary. Does |
| * nothing if not currently the primary. |
| * |
| * @return true if this advisor has been deposed as primary |
| */ |
| public boolean deposePrimary() { |
| if (isPrimary()) { |
| primaryMoveWriteLock.lock(); |
| boolean needToSendProfileUpdate = false; |
| try { |
| removePrimary(getDistributionManager().getId()); |
| synchronized (this) { |
| if (!isPrimary()) { |
| // releasePrimaryLock(); |
| needToSendProfileUpdate = true; |
| return true; |
| } else { |
| return false; // failed for some reason |
| } |
| } |
| } finally { |
| primaryMoveWriteLock.unlock(); |
| if (needToSendProfileUpdate) { |
| sendProfileUpdate(); |
| } |
| } |
| } else { |
| sendProfileUpdate(); |
| return true; |
| } |
| } |
| |
| /** |
| * This calls deposePrimary on every colocated child that is directly colocated to this bucket's |
| * PR. Those each in turn do the same to their child buckets and so on before returning. Each |
| * depose will send a dlock release message to the grantor, wait for reply, and then also send a |
| * profile update. |
| * <p> |
| * Caller must synchronize on this BucketAdvisor. |
| * |
| */ |
| private void deposePrimaryForColocatedChildren() { |
| boolean deposedChildPrimaries = true; |
| List<PartitionedRegion> colocatedChildPRs = ColocationHelper.getColocatedChildRegions(pRegion); |
| if (colocatedChildPRs != null) { |
| for (PartitionedRegion pr : colocatedChildPRs) { |
| Bucket b = pr.getRegionAdvisor().getBucket(getBucket().getId()); |
| if (b != null) { |
| BucketAdvisor ba = b.getBucketAdvisor(); |
| deposedChildPrimaries = ba.deposePrimary() && deposedChildPrimaries; |
| if (b instanceof BucketRegionQueue) { |
| BucketRegionQueue brq = (BucketRegionQueue) b; |
| brq.decQueueSize(brq.size()); |
| brq.incSecondaryQueueSize(brq.size()); |
| } |
| } |
| } |
| } |
| } |
| |
| private void deposeOtherPrimaryBucketForFixedPartition() { |
| boolean deposedOtherPrimaries = true; |
| int bucketId = getBucket().getId(); |
| List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl(); |
| if (fpas != null) { |
| for (FixedPartitionAttributesImpl fpa : fpas) { |
| if (fpa.getStartingBucketID() == bucketId) { |
| for (int i = (bucketId + 1); i <= fpa.getLastBucketID(); i++) { |
| Bucket b = regionAdvisor.getBucket(i); |
| if (b != null) { |
| BucketAdvisor ba = b.getBucketAdvisor(); |
| deposedOtherPrimaries = ba.deposePrimary() && deposedOtherPrimaries; |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| void removeBucket() { |
| setHosting(false); |
| } |
| |
| /** |
| * Return (and possibly choose) a thread-sticky member from whose data store this bucket's values |
| * should be read |
| * |
| * @return member to use for reads, null if none available |
| */ |
| public InternalDistributedMember getPreferredNode() { |
| |
| if (isHosting()) { |
| getPartitionedRegionStats().incPreferredReadLocal(); |
| return getDistributionManager().getId(); |
| } |
| |
| Profile[] locProfiles = profiles; // volatile read |
| if (locProfiles.length == 0) { |
| return null; |
| } |
| |
| Profile selectedProfile = selectNotInitializingProfile(locProfiles); |
| |
| if (selectedProfile == null) { |
| return null; |
| } |
| |
| getPartitionedRegionStats().incPreferredReadRemote(); |
| return selectedProfile.peerMemberId; |
| } |
| |
| /** |
| * Get the random Profile that is not initializing BucketProfile. |
| * |
| */ |
| private Profile selectNotInitializingProfile(Profile[] inProfiles) { |
| int index = 0; |
| int offset = 0; |
| if (inProfiles.length > 1) { |
| // Pick random offset. |
| offset = myRand.nextInt(inProfiles.length); |
| } |
| |
| for (int i = 0; i < inProfiles.length; i++) { |
| index = (offset + i) % inProfiles.length; |
| BucketProfile bp = (BucketProfile) inProfiles[index]; |
| if (!bp.isInitializing) { |
| return inProfiles[index]; |
| } |
| } |
| return null; |
| } |
| |
| /** |
| * Returns the thread-safe queue of primary volunteering tasks for the parent Partitioned Region. |
| * |
| * @return the queue of primary volunteering tasks |
| */ |
| private Queue<Runnable> getVolunteeringQueue() { |
| return regionAdvisor.getVolunteeringQueue(); |
| } |
| |
| /** |
| * Returns the semaphore which controls the number of threads allowed to consume from the |
| * {@link #getVolunteeringQueue volunteering queue}. |
| * |
| * @return the semaphore which controls the number of volunteering threads |
| */ |
| private Semaphore getVolunteeringSemaphore() { |
| return regionAdvisor.getVolunteeringSemaphore(); |
| } |
| |
| /** |
| * Returns the PartitionedRegionStats. |
| * |
| * @return the PartitionedRegionStats |
| */ |
| private PartitionedRegionStats getPartitionedRegionStats() { |
| return regionAdvisor.getPartitionedRegionStats(); |
| } |
| |
| /** |
| * Concurrency: protected by synchronizing on *this* |
| */ |
| @Override |
| protected void profileCreated(Profile profile) { |
| regionAdvisor.incrementBucketCount(profile); |
| super.profileCreated(profile); |
| if (updateRedundancy() > 0) { |
| // wake up any threads in waitForRedundancy or waitForPrimary |
| notifyAll(); |
| } |
| regionAdvisor.updateBucketStatus(getBucket().getId(), profile.peerMemberId, false); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Profile added {} Profile : {}", getBucket().getFullPath(), profile); |
| } |
| synchronized (this) { |
| updateServerBucketProfile(); |
| } |
| } |
| |
| /** |
| * Concurrency: protected by synchronizing on *this* |
| */ |
| @Override |
| protected void profileUpdated(Profile profile) { |
| super.profileUpdated(profile); |
| if (updateRedundancy() > 0) { |
| // wake up any threads in waitForRedundancy or waitForPrimary |
| notifyAll(); |
| } |
| regionAdvisor.updateBucketStatus(getBucket().getId(), profile.peerMemberId, false); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Profile updated {} Profile : {}", getBucket().getFullPath(), profile); |
| } |
| synchronized (this) { |
| updateServerBucketProfile(); |
| } |
| } |
| |
| /** |
| * Concurrency: protected by synchronizing on *this* |
| */ |
| @Override |
| protected void profileRemoved(Profile profile) { |
| if (profile != null) { |
| regionAdvisor.updateBucketStatus(getBucket().getId(), |
| profile.getDistributedMember(), true); |
| regionAdvisor.decrementsBucketCount(profile); |
| } |
| updateRedundancy(); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Profile removed {} the member lost {} Profile : {}", getBucket().getFullPath(), |
| profile != null ? profile.getDistributedMember() : null, profile); |
| } |
| synchronized (this) { |
| updateServerBucketProfile(); |
| } |
| } |
| |
| @Override |
| public boolean shouldSyncForCrashedMember(InternalDistributedMember id) { |
| BucketProfile profile = (BucketProfile) getProfile(id); |
| return (profile != null) && (profile.isPrimary); |
| } |
| |
| @Override |
| public DistributedRegion getRegionForDeltaGII() { |
| DistributedRegion result = super.getRegionForDeltaGII(); |
| if (result == null && getAdvisee() instanceof ProxyBucketRegion) { |
| result = ((ProxyBucketRegion) getAdvisee()).getHostedBucketRegion(); |
| } |
| return result; |
| } |
| |
| |
| |
| /** |
| * Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is |
| * the primary elector for this bucket. |
| * |
| * We can't call this method from BucketAdvisor.profileRemoved, because the primaryElector may not |
| * actually host the bucket. |
| * |
| */ |
| public void checkForLostPrimaryElector(Profile profile) { |
| // If the member that went away was in the middle of creating |
| // the bucket, finish the bucket creation. |
| ProfileId elector = primaryElector; |
| if (elector != null && elector.equals(profile.getDistributedMember())) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "Bucket {} lost the member responsible for electing the primary. Finishing bucket creation", |
| getBucket().getFullPath()); |
| } |
| primaryElector = getBucket().getDistributionManager().getId(); |
| getBucket().getDistributionManager().getExecutors().getWaitingThreadPool().execute( |
| () -> getBucket().getPartitionedRegion().getRedundancyProvider() |
| .finishIncompleteBucketCreation(getBucket().getId())); |
| } |
| } |
| |
| /** |
| * Only allows profiles that actually hosting this bucket. If the profile is primary, then |
| * primaryMember will be set to that member but only if we are not already the primary. |
| * |
| * @param profile the profile to add (must be a BucketProfile) |
| * @param forceProfile true will force profile to be added even if member is not in distributed |
| * view |
| * |
| * @see #adviseProfileUpdate() |
| */ |
| @Override |
| public synchronized boolean putProfile(Profile profile, boolean forceProfile) { |
| assert profile instanceof BucketProfile; |
| BucketProfile bp = (BucketProfile) profile; |
| |
| // Only hosting buckets will be initializing, the isInitializing boolean is to |
| // allow for early entry into the advisor for GII purposes |
| if (!bp.isHosting && !bp.isInitializing) { |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "BucketAdvisor#putProfile early out"); |
| } |
| return false; // Do not allow introduction of proxy profiles, they don't provide anything |
| // useful |
| // isHosting = false, isInitializing = false |
| } |
| if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) { |
| logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, |
| "BucketAdvisor#putProfile profile=<{}> force={}; profile = {}", profile, forceProfile, |
| bp); |
| } |
| // isHosting = false, isInitializing = true |
| // isHosting = true, isInitializing = false |
| // isHosting = true, isInitializing = true... (false state) |
| |
| final boolean applied; |
| synchronized (this) { |
| // force new membership version in the advisor so that the |
| // state flush mechanism can capture any updates to the bucket |
| // MIN_VALUE is intended as a somewhat unique value for potential debug purposes |
| profile.initialMembershipVersion = Long.MIN_VALUE; |
| applied = super.putProfile(profile, forceProfile); |
| // skip following block if isPrimary to avoid race where we process late |
| // arriving OTHER_PRIMARY profile after we've already become primary |
| if (applied && !isPrimary()) { |
| if (bp.isPrimary) { |
| setPrimaryMember(bp.getDistributedMember()); |
| } else { |
| notPrimary(bp.getDistributedMember()); |
| } |
| } // if: !isPrimary |
| |
| } // synchronized |
| return applied; |
| } |
| |
| private static <E> Set<E> newSetFromMap(Map<E, Boolean> map) { |
| if (map.isEmpty()) { |
| return new SetFromMap<>(map); |
| } |
| throw new IllegalArgumentException(); |
| } |
| |
| @SuppressWarnings("NullableProblems") |
| private static class SetFromMap<E> extends AbstractSet<E> implements Serializable { |
| private static final long serialVersionUID = 2454657854757543876L; |
| |
| // must named as it, to pass serialization compatibility test. |
| private Map<E, Boolean> m; |
| |
| private transient Set<E> backingSet; |
| |
| SetFromMap(final Map<E, Boolean> map) { |
| super(); |
| m = map; |
| backingSet = map.keySet(); |
| } |
| |
| @Override |
| public boolean equals(Object object) { |
| return backingSet.equals(object); |
| } |
| |
| @Override |
| public int hashCode() { |
| return backingSet.hashCode(); |
| } |
| |
| @Override |
| public boolean add(E object) { |
| return m.put(object, Boolean.TRUE) == null; |
| } |
| |
| @Override |
| public void clear() { |
| m.clear(); |
| } |
| |
| @Override |
| public String toString() { |
| return backingSet.toString(); |
| } |
| |
| @Override |
| public boolean contains(Object object) { |
| return backingSet.contains(object); |
| } |
| |
| @Override |
| public boolean containsAll(Collection<?> collection) { |
| return backingSet.containsAll(collection); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return m.isEmpty(); |
| } |
| |
| @Override |
| public boolean remove(Object object) { |
| return m.remove(object) != null; |
| } |
| |
| @Override |
| public boolean retainAll(Collection<?> collection) { |
| return backingSet.retainAll(collection); |
| } |
| |
| @Override |
| public Object[] toArray() { |
| return backingSet.toArray(); |
| } |
| |
| @Override |
| public <T> T[] toArray(T[] contents) { |
| return backingSet.toArray(contents); |
| } |
| |
| @Override |
| public Iterator<E> iterator() { |
| return backingSet.iterator(); |
| } |
| |
| @Override |
| public int size() { |
| return m.size(); |
| } |
| |
| private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException { |
| stream.defaultReadObject(); |
| backingSet = m == null ? Collections.emptySet() : m.keySet(); |
| } |
| } |
| |
| /** |
| * repopulates the RegionAdvisor's location information for this bucket |
| */ |
| private void updateServerBucketProfile() { |
| int bucketId = getBucket().getId(); |
| Set<ServerBucketProfile> serverProfiles = |
| newSetFromMap(new HashMap<>()); |
| for (Profile p : profiles) { |
| if (p instanceof ServerBucketProfile) { |
| serverProfiles.add((ServerBucketProfile) p); |
| } |
| } |
| regionAdvisor.setClientBucketProfiles(bucketId, serverProfiles); |
| } |
| |
| /** |
| * Only for local profile. |
| * |
| */ |
| public synchronized void updateServerBucketProfile(BucketProfile p) { |
| localProfile = p; |
| } |
| |
| public BucketProfile getLocalProfile() { |
| return localProfile; |
| } |
| |
| @Override |
| public boolean removeId(ProfileId memberId, boolean crashed, boolean destroyed, |
| boolean fromMembershipListener) { |
| boolean hadBucketRegion = super.removeId(memberId, crashed, destroyed, fromMembershipListener); |
| if (hadBucketRegion) { |
| // do NOT call notPrimary under synchronization |
| try { |
| notPrimary((InternalDistributedMember) memberId); |
| } catch (CancelException e) { |
| // must be closing the cache - no need to try to become primary |
| } |
| } |
| return hadBucketRegion; |
| } |
| |
| /** |
| * Removes the profile for the specified member. If that profile is marked as primary, this will |
| * call {@link #notPrimary(InternalDistributedMember)}. |
| * |
| * @param memberId the member to remove the profile for |
| * @param serialNum specific serial number to remove |
| * @return true if a matching profile for the member was found |
| */ |
| @Override |
| public boolean removeIdWithSerial(InternalDistributedMember memberId, int serialNum, |
| boolean regionDestroyed) { |
| boolean hadBucketRegion = super.removeIdWithSerial(memberId, serialNum, regionDestroyed); |
| if (hadBucketRegion) { |
| // do NOT call notPrimary under synchronization |
| notPrimary(memberId); |
| } |
| forceNewMembershipVersion(); |
| return hadBucketRegion; |
| } |
| |
| @Override |
| public Set<InternalDistributedMember> adviseProfileExchange() { |
| // delegate up to RegionAdvisor to include members that might have |
| // ProxyBucketRegion without a real BucketRegion |
| Assert.assertTrue(regionAdvisor.isInitialized()); |
| return regionAdvisor.adviseBucketProfileExchange(); |
| } |
| |
| @Override |
| public Set<InternalDistributedMember> adviseProfileUpdate() { |
| // delegate up to RegionAdvisor to include members that might have |
| // ProxyBucketRegion without a real BucketRegion |
| return regionAdvisor.adviseGeneric(); |
| } |
| |
| /** |
| * Sets hosting to false and returns without closing. Calling closeAdvisor will actually close |
| * this advisor. |
| */ |
| @Override |
| public void close() { |
| // ok, so BucketRegion extends DistributedRegion which calls close on the |
| // advisor, BUT the ProxyBucketRegion truly owns the advisor and only it |
| // should be able to close the advisor... |
| // |
| // if a BucketRegion closes, it will call this method and we want to change |
| // our state to NOT_HOSTING |
| // |
| // see this.closeAdvisor() |
| setHosting(false); |
| } |
| |
| /** |
| * Blocks until there is a known primary and return that member, but only if there are real bucket |
| * regions that exist. If there are no real bucket regions within the distribution config's |
| * member-timeout setting * 3 (time required to eject a member) + 15000, then this returns null. |
| * |
| * kbanks: reworked this method to avoid JIT issue #40639 |
| * |
| * @return the member who is primary for this bucket |
| */ |
| public InternalDistributedMember getPrimary() { |
| InternalDistributedMember primary = getExistingPrimary(); |
| if (primary == null) { |
| primary = waitForNewPrimary(); |
| } |
| return primary; |
| } |
| |
| /** |
| * This method was split out from getPrimary() due to bug #40639 and is only intended to be called |
| * from within that method. |
| * |
| * @see #getPrimary() |
| * @return the existing primary (if it is still in the view) otherwise null |
| */ |
| private InternalDistributedMember getExistingPrimary() { |
| return basicGetPrimaryMember(); |
| } |
| |
| /** |
| * This method was split out from getPrimary() due to bug #40639 and is only intended to be called |
| * from within that method. |
| * |
| * @see #getPrimary() |
| * @return the new primary |
| */ |
| private InternalDistributedMember waitForNewPrimary() { |
| DistributionManager dm = regionAdvisor.getDistributionManager(); |
| DistributionConfig config = dm.getConfig(); |
| // failure detection period |
| long timeout = config.getMemberTimeout() * 3L; |
| // plus time for a new member to become primary |
| timeout += Long.getLong(GeodeGlossary.GEMFIRE_PREFIX + "BucketAdvisor.getPrimaryTimeout", |
| 15000L); |
| return waitForPrimaryMember(timeout); |
| } |
| |
| /** |
| * Marks member as not primary. Initiates volunteerForPrimary if this member is hosting a real |
| * bucket. This method does nothing if the member parameter is the current member. |
| * |
| * @param member the member who is not primary |
| */ |
| public void notPrimary(InternalDistributedMember member) { |
| // Fix for 43569. Only the deposePrimary call should |
| // make the local member drop the primary lock. |
| if (!member.equals(getDistributionManager().getId())) { |
| removePrimary(member); |
| } |
| } |
| |
| /** |
| * Marks member as not primary. Initiates volunteerForPrimary if this member is hosting a real |
| * bucket. |
| * |
| * @param member the member who is not primary |
| */ |
| private void removePrimary(InternalDistributedMember member) { |
| boolean needToVolunteerForPrimary = false; |
| if (!isClosed()) { // hole: requestPrimaryState not hosting |
| initializationGate(); |
| } |
| boolean lostPrimary = false; |
| try { |
| synchronized (this) { |
| boolean wasPrimary = isPrimary() && getDistributionManager().getId().equals(member); |
| final InternalDistributedMember currentPrimary = primaryMember.get(); |
| if (currentPrimary != null && currentPrimary.equals(member)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("[BucketAdvisor.notPrimary] {} for {}", member, this); |
| } |
| primaryMember.set(null); |
| } else { |
| return; |
| } |
| |
| if (isClosed()) { |
| // possibly closed if caller comes from outside this advisor |
| return; // return quietly |
| } |
| // member is primary... need to change state to NO_PRIMARY_xxx |
| if (isHosting()) { |
| requestPrimaryState(NO_PRIMARY_HOSTING); |
| if (pRegion.isFixedPartitionedRegion()) { |
| InternalDistributedMember primaryMember = |
| regionAdvisor.adviseFixedPrimaryPartitionDataStore(getBucket().getId()); |
| needToVolunteerForPrimary = primaryMember == null || primaryMember.equals(member); |
| } else { |
| needToVolunteerForPrimary = true; |
| } |
| } else { |
| requestPrimaryState(NO_PRIMARY_NOT_HOSTING); |
| } |
| if (wasPrimary) { |
| lostPrimary = true; |
| } |
| |
| // try to fix yet another cause of bug 36881 |
| findAndSetPrimaryMember(); |
| } |
| } finally { |
| if (lostPrimary) { |
| invokeAfterSecondaryInPartitionListeners(); |
| Bucket br = regionAdvisor.getBucket(getBucket().getId()); |
| if (br instanceof BucketRegion) { |
| ((BucketRegion) br).beforeReleasingPrimaryLockDuringDemotion(); |
| } |
| |
| releasePrimaryLock(); |
| // this was a deposePrimary call so we need to depose children as well |
| deposePrimaryForColocatedChildren(); |
| if (pRegion.isFixedPartitionedRegion()) { |
| deposeOtherPrimaryBucketForFixedPartition(); |
| } |
| } |
| } |
| |
| if (needToVolunteerForPrimary) { |
| volunteerForPrimary(); |
| } |
| } |
| |
| /** |
| * Returns the ProxyBucketRegion which owns this advisor. |
| * |
| * @return the ProxyBucketRegion which owns this advisor |
| */ |
| public ProxyBucketRegion getProxyBucketRegion() { |
| return (ProxyBucketRegion) getAdvisee(); |
| } |
| |
| /** |
| * Actually close this advisor for real. Called by ProxyBucketRegion only. Calling this method |
| * actually closes this advisor whereas {@link #close()} only sets hosting to false. |
| */ |
| void closeAdvisor() { |
| boolean wasPrimary; |
| synchronized (this) { |
| if (isClosed()) { |
| return; |
| } |
| wasPrimary = isPrimary(); |
| super.close(); |
| requestPrimaryState(CLOSED); |
| redundancyTracker.closeBucket(); |
| localProfile = null; |
| } |
| if (wasPrimary) { |
| releasePrimaryLock(); |
| } |
| } |
| |
| /** |
| * Returns true if this advisor has been closed. |
| * |
| * @return true if this advisor has been closed |
| */ |
| protected boolean isClosed() { |
| return primaryState == CLOSED; |
| } |
| |
| /** |
| * Returns true if this member is currently marked as primary. |
| * |
| * @return true if this member is currently marked as primary |
| */ |
| public boolean isPrimary() { |
| return primaryState == IS_PRIMARY_HOSTING; |
| } |
| |
| /** |
| * Returns true if this member is currently volunteering for primary. |
| * |
| * @return true if this member is currently volunteering for primary |
| */ |
| private boolean isVolunteering() { |
| return primaryState == VOLUNTEERING_HOSTING; |
| } |
| |
| /** |
| * Returns true if this member is currently attempting to become primary. |
| * |
| * @return true if this member is currently attempting to become primary |
| */ |
| private boolean isBecomingPrimary() { |
| synchronized (this) { |
| return primaryState == BECOMING_HOSTING && volunteeringDelegate != null |
| && volunteeringDelegate.isAggressive(); |
| } |
| } |
| |
| /** |
| * Returns true if this member is currently hosting real bucket. |
| * |
| * @return true if this member is currently hosting real bucket |
| */ |
| public boolean isHosting() { |
| final byte primaryState = this.primaryState; |
| return primaryState == NO_PRIMARY_HOSTING || primaryState == OTHER_PRIMARY_HOSTING |
| || primaryState == VOLUNTEERING_HOSTING || primaryState == BECOMING_HOSTING |
| || primaryState == IS_PRIMARY_HOSTING; |
| } |
| |
| /** |
| * Attempt to acquire lock for primary until a primary exists. Caller hands off responsibility to |
| * an executor (waiting pool) and returns early. |
| */ |
| public void volunteerForPrimary() { |
| InternalDistributedMember elector = primaryElector; |
| if (elector != null && regionAdvisor.hasPartitionedRegion(elector)) { |
| // another server will determine the primary node |
| return; |
| } |
| |
| primaryElector = null; |
| |
| initializationGate(); |
| |
| synchronized (this) { |
| if (isVolunteering() || isClosed() || !isHosting()) { |
| // only one thread should be attempting to volunteer at one time |
| return; |
| } |
| |
| if (volunteeringDelegate == null) { |
| setVolunteeringDelegate(new VolunteeringDelegate()); |
| } |
| volunteeringDelegate.volunteerForPrimary(); |
| |
| } |
| } |
| |
| protected void setVolunteeringDelegate(VolunteeringDelegate delegate) { |
| volunteeringDelegate = delegate; |
| } |
| |
| /** |
| * Makes this <code>BucketAdvisor</code> become the primary if it is already a secondary. |
| * |
| * @param isRebalance true if directed to become primary by rebalancing |
| * @return true if this advisor succeeds in becoming the primary |
| */ |
| public boolean becomePrimary(boolean isRebalance) { |
| initializationGate(); |
| |
| long startTime = getPartitionedRegionStats().startPrimaryTransfer(isRebalance); |
| try { |
| long waitTime = 2000; // time each iteration will wait |
| while (!isPrimary()) { |
| getAdvisee().getCancelCriterion().checkCancelInProgress(null); |
| boolean attemptToBecomePrimary = false; |
| boolean attemptToDeposePrimary = false; |
| |
| if (Thread.currentThread().isInterrupted()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Breaking from becomePrimary loop due to thread interrupt flag being set"); |
| } |
| break; |
| } |
| if (isClosed() || !isHosting()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Breaking from becomePrimary loop because {} is closed or not hosting", |
| this); |
| } |
| break; |
| } |
| |
| VolunteeringDelegate vDelegate = null; |
| synchronized (this) { |
| if (isVolunteering()) { |
| // standard volunteering attempt already in progress... |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for volunteering thread {}. Time left: {} ms", this, waitTime); |
| } |
| wait(waitTime); // spurious wakeup ok |
| continue; |
| |
| } else if (isBecomingPrimary()) { |
| // reattempt to depose otherPrimary... |
| attemptToDeposePrimary = true; |
| |
| } else { |
| // invoke becomePrimary AFTER sync is released in this thread... |
| vDelegate = volunteeringDelegate; |
| if (vDelegate == null) { |
| vDelegate = new VolunteeringDelegate(); |
| volunteeringDelegate = vDelegate; |
| } |
| } // else |
| } // synchronized |
| |
| if (vDelegate != null) { |
| // Use the snapshot 'vDelegate' instead of 'this.volunteeringDelegate' since we are not |
| // synced here. |
| attemptToBecomePrimary = vDelegate.reserveForBecomePrimary(); // no sync! |
| } |
| |
| // release synchronization and then call becomePrimary |
| if (attemptToBecomePrimary) { |
| synchronized (this) { |
| if (volunteeringDelegate == null) { |
| volunteeringDelegate = new VolunteeringDelegate(); |
| } |
| volunteeringDelegate.volunteerForPrimary(); |
| attemptToDeposePrimary = true; |
| } // synchronized |
| Thread.sleep(10); |
| } // attemptToBecomePrimary |
| |
| // RACE: slight race condition with thread that's actually requesting the lock |
| |
| if (attemptToDeposePrimary) { |
| InternalDistributedMember otherPrimary = getPrimary(); |
| if (otherPrimary != null && !getDistributionManager().getId().equals(otherPrimary)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Attempting to depose primary on {} for {}", otherPrimary, this); |
| } |
| DeposePrimaryBucketResponse response = |
| DeposePrimaryBucketMessage.send(otherPrimary, pRegion, getBucket().getId()); |
| if (response != null) { |
| response.waitForRepliesUninterruptibly(); |
| if (logger.isDebugEnabled()) { |
| logger.debug("Deposed primary on {}", otherPrimary); |
| } |
| } |
| } |
| Thread.sleep(10); |
| } // attemptToDeposePrimary |
| |
| } // while |
| |
| } catch (InterruptedException e) { |
| // abort and return null |
| Thread.currentThread().interrupt(); |
| |
| } finally { |
| getPartitionedRegionStats().endPrimaryTransfer(startTime, isPrimary(), isRebalance); |
| } |
| |
| return isPrimary(); |
| } |
| |
| /** |
| * Check the primary member shortcut. Does not query the advisor. Should only be used when the |
| * advisor should not be consulted directly. |
| * |
| * @return the member or null if no primary exists |
| */ |
| public InternalDistributedMember basicGetPrimaryMember() { |
| return primaryMember.get(); |
| } |
| |
| /** |
| * Invoked when the primary lock has been acquired by this VM. |
| * |
| * @return true if successfully changed state to IS_PRIMARY |
| */ |
| private boolean acquiredPrimaryLock() { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Acquired primary lock for BucketID {} PR : {}", getBucket().getId(), |
| regionAdvisor.getPartitionedRegion().getFullPath()); |
| } |
| boolean changedStateToIsPrimary = false; |
| // Hold the primary move lock until we send a |
| // profile update. This will prevent writes |
| // from occurring until all members know that |
| // this member is now the primary. |
| boolean shouldInvokeListeners = false; |
| primaryMoveWriteLock.lock(); |
| try { |
| synchronized (this) { |
| if (isHosting() && (isVolunteering() || isBecomingPrimary())) { |
| Bucket br = regionAdvisor.getBucket(getBucket().getId()); |
| if (br instanceof BucketRegion) { |
| ((BucketRegion) br).beforeAcquiringPrimaryState(); |
| } |
| if (requestPrimaryState(IS_PRIMARY_HOSTING)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Acquired primary lock for setting primary now BucketID {} PR : {}", |
| getBucket().getId(), regionAdvisor.getPartitionedRegion().getFullPath()); |
| } |
| setPrimaryMember(getDistributionManager().getId()); |
| changedStateToIsPrimary = true; |
| if (hasPrimary() && isPrimary()) { |
| shouldInvokeListeners = true; |
| } |
| } |
| } |
| } |
| |
| if (shouldInvokeListeners) { |
| invokePartitionListeners(); |
| } |
| return changedStateToIsPrimary; |
| } finally { |
| try { |
| if (changedStateToIsPrimary) { |
| // send profile update AFTER releasing sync |
| sendProfileUpdate(); |
| |
| Bucket br = regionAdvisor.getBucket(getBucket().getId()); |
| if (br instanceof BucketRegion) { |
| ((BucketRegion) br).processPendingSecondaryExpires(); |
| } |
| if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue |
| BucketRegionQueue brq = (BucketRegionQueue) br; |
| brq.incQueueSize(brq.size()); |
| brq.decSecondaryQueueSize(brq.size()); |
| } |
| if (br instanceof BucketRegion) { |
| ((BucketRegion) br).afterAcquiringPrimaryState(); |
| } |
| } else { |
| // release primary lock AFTER releasing sync |
| releasePrimaryLock(); |
| } |
| } finally { |
| primaryMoveWriteLock.unlock(); |
| } |
| } |
| } |
| |
| private void invokePartitionListeners() { |
| PartitionListener[] listeners = pRegion.getPartitionListeners(); |
| if (listeners == null || listeners.length == 0) { |
| return; |
| } |
| for (PartitionListener listener : listeners) { |
| if (listener != null) { |
| listener.afterPrimary(getBucket().getId()); |
| } |
| } |
| } |
| |
| private void invokeAfterSecondaryInPartitionListeners() { |
| PartitionListener[] listeners = pRegion.getPartitionListeners(); |
| if (listeners == null || listeners.length == 0) { |
| return; |
| } |
| for (PartitionListener listener : listeners) { |
| if (listener != null) { |
| listener.afterSecondary(getBucket().getId()); |
| } |
| } |
| } |
| |
| /** |
| * Lazily gets the lock for acquiring primary lock. Caller must handle null. If DLS, Cache, or |
| * DistributedSystem are shutting down then null will be returned. If DLS does not yet exist and |
| * createDLS is false then null will be returned. |
| * |
| * @param createDLS true will create DLS if it does not exist |
| * @return distributed lock indicating primary member or null |
| */ |
| private DistributedMemberLock getPrimaryLock(boolean createDLS) { |
| synchronized (this) { |
| if (primaryLock == null) { |
| DistributedLockService dls = DistributedLockService |
| .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); |
| if (dls == null) { |
| if (!createDLS || getProxyBucketRegion().getCache().isClosed()) { |
| return null; // cache closure has destroyed the DLS |
| } |
| try { // TODO: call GemFireCache#getPartitionedRegionLockService |
| dls = DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME, |
| getAdvisee().getSystem(), true /* distributed */, true /* destroyOnDisconnect */, |
| true /* automateFreeResources */); |
| } catch (IllegalArgumentException e) { |
| // indicates that the DLS is already created |
| dls = DistributedLockService |
| .getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME); |
| if (dls == null) { |
| // another thread destroyed DLS after this thread called create |
| return null; // ok, caller will loop if necessary |
| } |
| } |
| // TODO: we need a good NotConnectedException to replace |
| // IllegalStateException and ShutdownException |
| // perhaps: DistributedSystemUnavailableException |
| catch (IllegalStateException | DistributedSystemDisconnectedException e) { |
| // create still throws IllegalStateException if isDisconnecting is true |
| return null; |
| } // this would certainly prevent us from creating a DLS... messy |
| |
| } |
| primaryLock = new DistributedMemberLock(dls, getAdvisee().getName(), |
| DistributedMemberLock.NON_EXPIRING_LEASE, |
| DistributedMemberLock.LockReentryPolicy.PREVENT_SILENTLY); |
| } |
| return primaryLock; |
| } |
| } |
| |
| private void acquirePrimaryRecursivelyForColocated() { |
| final List<PartitionedRegion> colocatedWithList = |
| ColocationHelper.getColocatedChildRegions(regionAdvisor.getPartitionedRegion()); |
| if (colocatedWithList != null) { |
| for (PartitionedRegion childPR : colocatedWithList) { |
| Bucket b = childPR.getRegionAdvisor().getBucket(getBucket().getId()); |
| BucketAdvisor childBA = b.getBucketAdvisor(); |
| Assert.assertHoldsLock(childBA, false); |
| boolean acquireForChild = false; |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "BucketAdvisor.acquirePrimaryRecursivelyForColocated: about to take lock for bucket: {} of PR: {} with isHosting={}", |
| getBucket().getId(), childPR.getFullPath(), childBA.isHosting()); |
| } |
| childBA.primaryMoveWriteLock.lock(); |
| try { |
| if (childBA.isHosting()) { |
| if (isPrimary()) { |
| if (!childBA.isPrimary()) { |
| childBA.setVolunteering(); |
| boolean acquired = childBA.acquiredPrimaryLock(); |
| acquireForChild = true; |
| if (acquired && pRegion.isFixedPartitionedRegion()) { |
| childBA.acquirePrimaryForRestOfTheBucket(); |
| } |
| } else { |
| acquireForChild = true; |
| } |
| } |
| } // if isHosting |
| if (acquireForChild) { |
| childBA.acquirePrimaryRecursivelyForColocated(); |
| } |
| } finally { |
| childBA.primaryMoveWriteLock.unlock(); |
| } |
| } |
| } |
| } |
| |
| private void acquirePrimaryForRestOfTheBucket() { |
| List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl(); |
| if (fpas != null) { |
| int bucketId = getBucket().getId(); |
| for (FixedPartitionAttributesImpl fpa : fpas) { |
| if (fpa.getStartingBucketID() == bucketId) { |
| for (int i = bucketId + 1; i <= fpa.getLastBucketID();) { |
| Bucket b = regionAdvisor.getBucket(i++); |
| if (b != null) { |
| BucketAdvisor ba = b.getBucketAdvisor(); |
| ba.primaryMoveWriteLock.lock(); |
| try { |
| if (ba.isHosting()) { |
| if (!ba.isPrimary()) { |
| ba.setVolunteering(); |
| ba.acquiredPrimaryLock(); |
| } |
| } |
| } finally { |
| ba.primaryMoveWriteLock.unlock(); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Sets volunteering to true. Returns true if the state of volunteering was changed. Returns false |
| * if voluntering was already equal to true. Caller should do nothing if false is returned. |
| */ |
| private boolean setVolunteering() { |
| synchronized (this) { |
| return requestPrimaryState(VOLUNTEERING_HOSTING); |
| } |
| } |
| |
| /** |
| * Sets becoming primary to true. Returns true if the state of becoming was changed. Returns false |
| * if becoming was already equal to true. Caller should do nothing if false is returned. |
| */ |
| private boolean setBecoming() { |
| synchronized (this) { |
| return requestPrimaryState(BECOMING_HOSTING); |
| } |
| } |
| |
| /** |
| * Wait briefly for a primary member to be identified. |
| * |
| * @param timeout time in milliseconds to wait for a primary |
| * @return the primary bucket host |
| */ |
| private InternalDistributedMember waitForPrimaryMember(long timeout) { |
| synchronized (this) { |
| // let's park this thread and wait for a primary! |
| StopWatch timer = new StopWatch(true); |
| long warnTime = getDistributionManager().getConfig().getAckWaitThreshold() * 1000L; |
| boolean loggedWarning = false; |
| try { |
| for (;;) { |
| // bail out if the system starts closing |
| getAdvisee().getCancelCriterion().checkCancelInProgress(null); |
| final InternalCache cache = getBucket().getCache(); |
| if (cache != null && cache.isCacheAtShutdownAll()) { |
| throw cache.getCacheClosedException("Cache is shutting down"); |
| } |
| |
| if (getBucketRedundancy() == -1) { |
| // there are no real buckets in other vms... no reason to wait |
| return null; |
| } |
| getProxyBucketRegion().getPartitionedRegion().checkReadiness(); |
| if (isClosed()) { |
| break; |
| } |
| long elapsed = timer.elapsedTimeMillis(); |
| long timeLeft = timeout - elapsed; |
| if (timeLeft <= 0) { |
| break; |
| } |
| if (getBucketRedundancy() == -1 || isClosed()) { |
| break; // early out... all bucket regions are gone or we closed |
| } |
| InternalDistributedMember primary = basicGetPrimaryMember(); |
| if (primary != null) { |
| return primary; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for bucket {}. Time left :{} ms", this, timeLeft); |
| } |
| |
| // Log a warning if we have waited for the ack wait threshold time. |
| if (!loggedWarning) { |
| long timeUntilWarning = warnTime - elapsed; |
| if (timeUntilWarning <= 0) { |
| logger |
| .warn( |
| "{} secs have elapsed waiting for a primary for bucket {}. Current bucket owners {}", |
| new Object[] {warnTime / 1000L, this, adviseInitialized()}); |
| // log a warning; |
| loggedWarning = true; |
| } else { |
| timeLeft = timeLeft > timeUntilWarning ? timeUntilWarning : timeLeft; |
| } |
| } |
| wait(timeLeft); // spurious wakeup ok |
| } |
| } catch (InterruptedException e) { |
| // abort and return null |
| Thread.currentThread().interrupt(); |
| } finally { |
| if (loggedWarning) { |
| logger.info( |
| "Wait for primary completed"); |
| } |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * How long to wait, in millisecs, for redundant buckets to exist |
| */ |
| private static final long BUCKET_REDUNDANCY_WAIT = 15000L; // 15 seconds |
| |
| /** |
| * Wait the desired redundancy to be met. |
| * |
| * @param minRedundancy the amount of desired redundancy. |
| * @return true if desired redundancy is detected |
| */ |
| public boolean waitForRedundancy(int minRedundancy) { |
| synchronized (this) { |
| // let's park this thread and wait for redundancy! |
| StopWatch timer = new StopWatch(true); |
| try { |
| for (;;) { |
| if (getBucketRedundancy() >= minRedundancy) { |
| return true; |
| } |
| getProxyBucketRegion().getPartitionedRegion().checkReadiness(); |
| if (isClosed()) { |
| return false; |
| } |
| long timeLeft = BUCKET_REDUNDANCY_WAIT - timer.elapsedTimeMillis(); |
| if (timeLeft <= 0) { |
| return false; |
| } |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for bucket {}", this); |
| } |
| wait(timeLeft); // spurious wakeup ok |
| } |
| } catch (InterruptedException e) { |
| // abort and return null |
| Thread.currentThread().interrupt(); |
| } |
| return false; |
| } |
| } |
| |
| synchronized void clearPrimaryElector() { |
| primaryElector = null; |
| } |
| |
| synchronized void setPrimaryElector(InternalDistributedMember newPrimaryElector) { |
| // Only set the new primary elector if we have not yet seen |
| // a primary for this bucket. |
| if (primaryElector != null) { |
| if (newPrimaryElector != null && !regionAdvisor.hasPartitionedRegion(newPrimaryElector)) { |
| // no longer a participant - don't use it |
| primaryElector = null; |
| } else { |
| primaryElector = newPrimaryElector; |
| } |
| } |
| } |
| |
| |
| public synchronized void initializePrimaryElector(InternalDistributedMember newPrimaryElector) { |
| // For child buckets, we want the parent bucket to take care' |
| // of finishing an incomplete bucket creation, so only set the elector for |
| // the leader region. |
| if (parentAdvisor == null) { |
| if (newPrimaryElector != null && !regionAdvisor.hasPartitionedRegion(newPrimaryElector)) { |
| // no longer a participant - don't use it |
| primaryElector = null; |
| } else { |
| primaryElector = newPrimaryElector; |
| } |
| } |
| } |
| |
| /** |
| * Invoked when real bucket is created for hosting in this VM. |
| * |
| * @param value true to begin hosting; false to end hosting |
| */ |
| protected void setHosting(boolean value) { |
| // boolean needToNotPrimarySelf = false; |
| boolean needToVolunteerForPrimary = false; |
| boolean wasPrimary; |
| synchronized (this) { |
| wasPrimary = isPrimary(); |
| if (isClosed()) { |
| return; |
| } |
| if (value) { // setting to HOSTING... |
| if (hasPrimary()) { |
| requestPrimaryState(OTHER_PRIMARY_HOSTING); |
| } else { |
| requestPrimaryState(NO_PRIMARY_HOSTING); |
| needToVolunteerForPrimary = true; |
| } |
| } |
| |
| else { // setting to NOT_HOSTING... |
| if (hasPrimary()) { // has primary... |
| if (isPrimary()) { |
| requestPrimaryState(NO_PRIMARY_NOT_HOSTING); |
| primaryMember.set(null); |
| findAndSetPrimaryMember(); |
| } else { |
| requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING); |
| } |
| } else { // no primary... |
| // acquiredPrimaryLock will check isHosting and release if not hosting |
| requestPrimaryState(NO_PRIMARY_NOT_HOSTING); |
| } |
| } |
| volunteeringDelegate = null; |
| |
| // Note - checkRedundancy has the side effect that it updates the stats. |
| // We need to invoke checkRedundancy here, regardless of whether we |
| // need this notify. |
| if (updateRedundancy() > 0 && isHosting()) { |
| // wake up any threads in waitForRedundancy or waitForPrimary |
| notifyAll(); |
| } |
| } |
| if (wasPrimary) { |
| releasePrimaryLock(); |
| } |
| |
| if (logger.isTraceEnabled()) { |
| logger.trace("setHosting: {} needToVolunteerForPrimary={} primaryElector: {}", this, |
| needToVolunteerForPrimary, primaryElector); |
| } |
| /* |
| * if (needToNotPrimarySelf) { notPrimary(getAdvisee().getDistributionManager().getId()); } |
| */ |
| if (needToVolunteerForPrimary) { |
| volunteerForPrimary(); |
| } |
| |
| sendProfileUpdate(); |
| } |
| |
| /** |
| * Sends updated profile for this member to every member with the <code>PartitionedRegion</code>. |
| * <p> |
| * Never call this method while synchronized on this BucketAdvisor. This will result in |
| * distributed deadlocks. |
| */ |
| private void sendProfileUpdate() { |
| if (getDistributionManager().getSystem().isLoner()) { |
| // no one to send the profile update... return to prevent bug 39760 |
| return; |
| } |
| // make sure caller is not synchronized or we'll deadlock |
| Assert.assertTrue(!Thread.holdsLock(this), |
| "Attempting to sendProfileUpdate while synchronized may result in deadlock"); |
| // NOTE: if this assert fails, you COULD use the WaitingThreadPool in DM |
| |
| final int partitionedRegionId = pRegion.getPRId(); |
| final int bucketId = ((ProxyBucketRegion) getAdvisee()).getBucketId(); |
| |
| BucketProfile bp = (BucketProfile) createProfile(); |
| updateServerBucketProfile(bp); |
| InternalDistributedMember primary = basicGetPrimaryMember(); |
| HashSet<InternalDistributedMember> hostsAndProxyMembers = new HashSet<>(); |
| if (primary != null && !primary.equals(getDistributionManager().getId())) { |
| hostsAndProxyMembers.add(primary); // Add the primary |
| } |
| hostsAndProxyMembers.addAll(adviseGeneric()); // Add all members hosting the bucket |
| hostsAndProxyMembers.addAll(adviseProfileUpdate()); // Add all proxy instances that could use |
| // the bucket |
| ReplyProcessor21 reply = BucketProfileUpdateMessage.send(hostsAndProxyMembers, |
| getDistributionManager(), partitionedRegionId, bucketId, bp, true); |
| if (reply != null) { |
| reply.waitForRepliesUninterruptibly(); |
| } |
| } |
| |
| /** |
| * Returns true if the a primary is known. |
| */ |
| private boolean hasPrimary() { |
| final byte primaryState = this.primaryState; |
| return primaryState == OTHER_PRIMARY_NOT_HOSTING || primaryState == OTHER_PRIMARY_HOSTING |
| || primaryState == IS_PRIMARY_HOSTING; |
| } |
| |
| @Override |
| protected Profile instantiateProfile(InternalDistributedMember memberId, int version) { |
| if (!pRegion.isShadowPR()) { |
| Set<BucketServerLocation66> serverLocations = getBucketServerLocations(version); |
| if (!serverLocations.isEmpty()) { |
| return new ServerBucketProfile(memberId, version, getBucket(), (HashSet) serverLocations); |
| } |
| } |
| return new BucketProfile(memberId, version, getBucket()); |
| } |
| |
| protected Set<BucketServerLocation66> getBucketServerLocations(int version) { |
| InternalCache cache = getProxyBucketRegion().getCache(); |
| List<CacheServer> servers = cache.getCacheServers(); |
| if (servers.isEmpty()) { |
| return Collections.emptySet(); |
| } |
| HashSet<BucketServerLocation66> serverLocations = new HashSet<>(); |
| for (CacheServer cacheServer : servers) { |
| CacheServerImpl server = (CacheServerImpl) cacheServer; |
| try { |
| String serverExternalAddress; |
| if (server.isRunning() && ((serverExternalAddress = server.getExternalAddress()) != null)) { |
| BucketServerLocation66 location = new BucketServerLocation66(getBucket().getId(), |
| server.getPort(), serverExternalAddress, getBucket().isPrimary(), |
| Integer.valueOf(version).byteValue(), server.getCombinedGroups()); |
| serverLocations.add(location); |
| } |
| } catch (IllegalStateException e) { |
| if (!(e.getMessage() != null |
| && e.getMessage().equals(CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE))) { |
| throw e; |
| } |
| } |
| } |
| return serverLocations; |
| } |
| |
| /** |
| * Sets primaryMember and notifies all. Caller must be synced on this. |
| * |
| * @param id the member to use as primary for this bucket |
| */ |
| private void setPrimaryMember(InternalDistributedMember id) { |
| if (!getDistributionManager().getId().equals(id)) { |
| // volunteerForPrimary handles primary state change if its our id |
| if (isHosting()) { |
| requestPrimaryState(OTHER_PRIMARY_HOSTING); |
| } else { |
| requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING); |
| } |
| } |
| primaryMember.set(id); |
| everHadPrimary = true; |
| |
| if (id != null && id.equals(primaryElector)) { |
| primaryElector = null; |
| } |
| notifyAll(); // wake up any threads in waitForPrimaryMember |
| } |
| |
| void setHadPrimary() { |
| everHadPrimary = true; |
| } |
| |
| boolean getHadPrimary() { |
| return everHadPrimary; |
| } |
| |
| public InternalDistributedMember getPrimaryElector() { |
| return primaryElector; |
| } |
| |
| /** |
| * Get the current number of bucket hosts and update the redundancy statistics for the region |
| * |
| * @return number of current bucket hosts |
| */ |
| private int updateRedundancy() { |
| int numBucketHosts = getNumInitializedBuckets(); |
| if (!isClosed()) { |
| redundancyTracker.updateStatistics(numBucketHosts); |
| } |
| return numBucketHosts; |
| } |
| |
| /** |
| * Returns all {@link InternalDistributedMember}s currently flagged as primary. |
| * <p> |
| * Since profile messages may arrive out of order from different members, more than one member may |
| * temporarily be flagged as primary. |
| * <p> |
| * The user of this BucketAdvisor should simply assume that the first profile is primary until the |
| * dust settles, leaving only one primary profile. |
| * |
| * @return zero or greater array of primary members |
| */ |
| private InternalDistributedMember[] findPrimaryMembers() { |
| Set<InternalDistributedMember> primaryMembers = adviseFilter(profile -> { |
| assert profile instanceof BucketProfile; |
| BucketProfile srp = (BucketProfile) profile; |
| return srp.isPrimary; |
| }); |
| if (primaryMembers.size() > 1 && logger.isDebugEnabled()) { |
| logger.debug("[findPrimaryProfiles] found the following primary members for {}: {}", |
| getAdvisee().getName(), primaryMembers); |
| } |
| return primaryMembers.toArray(new InternalDistributedMember[0]); |
| } |
| |
| /** |
| * Searches through profiles to find first profile that is flagged as primary and sets |
| * {@link #primaryMember} to it. Caller must synchronize on this BucketAdvisor. |
| * |
| * @return true if a primary member was found and used |
| * @see #findAndSetPrimaryMember() |
| */ |
| private boolean findAndSetPrimaryMember() { |
| if (isPrimary()) { |
| setPrimaryMember(getDistributionManager().getDistributionManagerId()); |
| return true; |
| } |
| InternalDistributedMember[] primaryMembers = findPrimaryMembers(); |
| if (primaryMembers.length > 0) { |
| setPrimaryMember(primaryMembers[0]); |
| return true; |
| } else { |
| return false; |
| } |
| } |
| |
| /** |
| * Returns the current redundancy of the this bucket, including the locally hosted bucket if it |
| * exists. |
| * |
| * @return current number of hosts of this bucket ; -1 if there are no hosts |
| */ |
| public int getBucketRedundancy() { |
| return redundancyTracker.getCurrentRedundancy(); |
| } |
| |
| Set<InternalDistributedMember> adviseInitialized() { |
| return adviseFilter(profile -> { |
| assert profile instanceof BucketProfile; |
| BucketProfile bucketProfile = (BucketProfile) profile; |
| return bucketProfile.isHosting; |
| }); |
| |
| } |
| |
| Set<InternalDistributedMember> adviseRecoveredFromDisk() { |
| return regionAdvisor.adviseInitializedDataStore(); |
| } |
| |
| /** |
| * Get the number of members that are hosting the bucket, and have finished initialization. |
| * |
| * This method is currently only used to check the bucket redundancy just before creating the |
| * bucket. If it is used more frequently, it might be better to cache this count. |
| */ |
| private int getNumInitializedBuckets() { |
| Profile[] locProfiles = profiles; // grab current profiles |
| int count = 0; |
| for (Profile profile : locProfiles) { |
| BucketProfile bucketProfile = (BucketProfile) profile; |
| if (bucketProfile.isHosting) { |
| count++; |
| } |
| } |
| if (isHosting()) { |
| count++; |
| } |
| return count; |
| } |
| |
| private Bucket getBucket() { |
| return (Bucket) getAdvisee(); |
| } |
| |
| /** |
| * Releases the primary lock for this bucket. |
| */ |
| private void releasePrimaryLock() { |
| // We don't have a lock if we have a parent advisor |
| if (parentAdvisor != null) { |
| return; |
| } |
| assignStartingBucketAdvisorIfFixedPartitioned(); |
| if (startingBucketAdvisor != null) { |
| return; |
| } |
| try { |
| DistributedMemberLock thePrimaryLock = getPrimaryLock(false); |
| if (thePrimaryLock != null) { |
| thePrimaryLock.unlock(); |
| } |
| } catch (LockNotHeldException e) { |
| Assert.assertTrue(!isHosting(), "Got LockNotHeldException for Bucket = " + this); |
| } catch (LockServiceDestroyedException e) { |
| Assert.assertTrue(isClosed(), |
| "BucketAdvisor was not closed before destroying PR lock service"); |
| } |
| } |
| |
| private String primaryStateToString() { |
| return primaryStateToString(primaryState); |
| } |
| |
| /** |
| * Returns string representation of the primary state value. |
| * |
| * @param value the primary state to return string for |
| * @return string representation of primaryState |
| */ |
| private String primaryStateToString(byte value) { |
| switch (value) { |
| case NO_PRIMARY_NOT_HOSTING: |
| return "NO_PRIMARY_NOT_HOSTING"; |
| case NO_PRIMARY_HOSTING: |
| return "NO_PRIMARY_HOSTING"; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| return "OTHER_PRIMARY_NOT_HOSTING"; |
| case OTHER_PRIMARY_HOSTING: |
| return "OTHER_PRIMARY_HOSTING"; |
| case VOLUNTEERING_HOSTING: |
| return "VOLUNTEERING_HOSTING"; |
| case BECOMING_HOSTING: |
| return "BECOMING_HOSTING"; |
| case IS_PRIMARY_HOSTING: |
| return "IS_PRIMARY_HOSTING"; |
| case CLOSED: |
| return "CLOSED"; |
| default: |
| return "<unhandled primaryState " + value + " >"; |
| } |
| } |
| |
| /** |
| * Requests change to the requested primary state. Controls all state changes pertaining to |
| * primary state. Caller must be synchronized on this. |
| * |
| * @param requestedState primaryState to change to |
| * @return true if the requestedState change was completed |
| * @throws IllegalStateException if an illegal state change was attempted |
| */ |
| private boolean requestPrimaryState(byte requestedState) { |
| final byte fromState = primaryState; |
| switch (fromState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| // race condition ok, return false |
| return false; |
| case NO_PRIMARY_HOSTING: |
| primaryState = requestedState; |
| break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| primaryState = requestedState; |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| primaryState = requestedState; |
| break; |
| case BECOMING_HOSTING: |
| // race condition during close is ok, return false |
| return false; |
| case VOLUNTEERING_HOSTING: |
| // race condition during close is ok, return false |
| return false; |
| case CLOSED: |
| primaryState = requestedState; |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), primaryStateToString(requestedState))); |
| } |
| break; |
| case NO_PRIMARY_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| primaryState = requestedState; |
| break; |
| // case OTHER_PRIMARY_NOT_HOSTING: -- enable for bucket migration |
| // this.primaryState = requestedState; |
| // break; |
| case NO_PRIMARY_HOSTING: |
| // race condition ok, return false |
| return false; |
| case VOLUNTEERING_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.putStartTime(this, stats.startVolunteering()); |
| } |
| break; |
| case BECOMING_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.putStartTime(this, stats.startVolunteering()); |
| } |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| primaryState = requestedState; |
| break; |
| case CLOSED: |
| primaryState = requestedState; |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState))); |
| } |
| break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| primaryState = requestedState; |
| break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // race condition ok, return false |
| return false; |
| case OTHER_PRIMARY_HOSTING: |
| primaryState = requestedState; |
| break; |
| case BECOMING_HOSTING: |
| // race condition during close is ok, return false |
| return false; |
| case VOLUNTEERING_HOSTING: |
| // race condition during close is ok, return false |
| return false; |
| case CLOSED: |
| primaryState = requestedState; |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState))); |
| } |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| switch (requestedState) { |
| // case NO_PRIMARY_NOT_HOSTING: -- enable for bucket migration |
| // this.primaryState = requestedState; |
| // break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called |
| primaryState = requestedState; |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| // race condition ok, return false |
| return false; |
| case NO_PRIMARY_HOSTING: |
| primaryState = requestedState; |
| break; |
| case CLOSED: |
| primaryState = requestedState; |
| break; |
| case VOLUNTEERING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case BECOMING_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.putStartTime(this, stats.startVolunteering()); |
| } |
| break; |
| case IS_PRIMARY_HOSTING: |
| // race condition ok, probably race in HA where other becomes |
| // primary and immediately leaves while we have try-lock message |
| // enroute to grantor |
| primaryState = requestedState; |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState))); |
| } |
| break; |
| case VOLUNTEERING_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called |
| // Profile update for other primary may have slipped in |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| case NO_PRIMARY_HOSTING: |
| // race condition occurred, return false and stay in volunteering |
| return false; |
| case IS_PRIMARY_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.incPrimaryBucketCount(1); |
| stats.endVolunteeringBecamePrimary(stats.removeStartTime(this)); |
| } |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringOtherPrimary(stats.removeStartTime(this)); |
| } |
| break; |
| case VOLUNTEERING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case BECOMING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case CLOSED: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState))); |
| } |
| break; |
| case BECOMING_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called |
| // Profile update for other primary may have slipped in |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| case NO_PRIMARY_HOSTING: |
| // race condition occurred, return false and stay in volunteering |
| return false; |
| case IS_PRIMARY_HOSTING: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.incPrimaryBucketCount(1); |
| stats.endVolunteeringBecamePrimary(stats.removeStartTime(this)); |
| } |
| break; |
| case OTHER_PRIMARY_HOSTING: |
| return false; |
| case VOLUNTEERING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case BECOMING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case CLOSED: |
| primaryState = requestedState; { |
| PartitionedRegionStats stats = getPartitionedRegionStats(); |
| stats.endVolunteeringClosed(stats.removeStartTime(this)); |
| } |
| break; |
| default: |
| throw new IllegalStateException(String.format("Cannot change from %s to %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState))); |
| } |
| break; |
| case IS_PRIMARY_HOSTING: |
| switch (requestedState) { |
| case NO_PRIMARY_HOSTING: |
| // rebalancing must have moved the primary |
| changeFromPrimaryTo(requestedState); |
| break; |
| // case OTHER_PRIMARY_HOSTING: -- enable for bucket migration |
| // // rebalancing must have moved the primary |
| // changeFromPrimaryTo(requestedState); |
| // break; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // rebalancing must have moved the primary and primary |
| changeFromPrimaryTo(requestedState); |
| break; |
| case NO_PRIMARY_NOT_HOSTING: |
| // May occur when setHosting(false) is called due to closing |
| changeFromPrimaryTo(requestedState); |
| break; |
| case VOLUNTEERING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case BECOMING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case CLOSED: |
| changeFromPrimaryTo(requestedState); |
| break; |
| default: |
| throw new IllegalStateException("Cannot change from " + primaryStateToString() |
| + " to " + primaryStateToString(requestedState)); |
| } |
| break; |
| case CLOSED: |
| switch (requestedState) { |
| case CLOSED: |
| Exception e = new Exception( |
| "Attempted to close BucketAdvisor that is already CLOSED"); |
| logger.warn("Attempted to close BucketAdvisor that is already CLOSED", |
| e); |
| break; |
| case VOLUNTEERING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case BECOMING_HOSTING: |
| // race condition ok, return false to abort volunteering |
| return false; |
| case IS_PRIMARY_HOSTING: |
| // Commonly occurs when closing and volunteering thread is still running |
| return false; |
| case OTHER_PRIMARY_NOT_HOSTING: |
| // Commonly occurs when a putProfile occurs during closure |
| return false; |
| default: |
| throw new IllegalStateException( |
| String.format("Cannot change from %s to %s for bucket %s", |
| primaryStateToString(), |
| primaryStateToString(requestedState), getAdvisee().getName())); |
| } |
| } |
| return primaryState == requestedState; |
| } |
| |
| private void changeFromPrimaryTo(byte requestedState) { |
| try { |
| primaryState = requestedState; |
| } finally { |
| getPartitionedRegionStats().incPrimaryBucketCount(-1); |
| } |
| } |
| |
| @Override |
| public Set adviseDestroyRegion() { |
| // fix for bug 37604 - tell all owners of the pr that the bucket is being |
| // destroyed. This is needed when bucket cleanup is performed |
| return regionAdvisor.adviseAllPRNodes(); |
| } |
| |
| /** |
| * returns the set of all the members in the system which require both DistributedCacheOperation |
| * messages and notification-only partition messages |
| * |
| * @return a set of recipients requiring both cache-op and notification messages |
| * @since GemFire 5.7 |
| */ |
| public Set<InternalDistributedMember> adviseRequiresTwoMessages() { |
| return adviseNotInitialized(); |
| } |
| |
| |
| private Set<InternalDistributedMember> adviseNotInitialized() { |
| return adviseFilter(profile -> { |
| assert profile instanceof CacheProfile; |
| CacheProfile cp = (CacheProfile) profile; |
| return !cp.regionInitialized; |
| }); |
| } |
| |
| |
| @Override |
| public Set adviseNetWrite() { |
| return regionAdvisor.adviseNetWrite(); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder("[BucketAdvisor ").append(getAdvisee().getFullPath()) |
| .append(':').append(getAdvisee().getSerialNumber()).append(": "); |
| sb.append("state=").append(primaryStateToString()); |
| sb.append("]"); |
| return sb.toString(); |
| } |
| |
| // A listener for events on this bucket, and also for the entire PR |
| @Override |
| public void addMembershipAndProxyListener(MembershipListener listener) { |
| super.addMembershipAndProxyListener(listener); |
| regionAdvisor.addMembershipListener(listener); |
| } |
| |
| @Override |
| public void removeMembershipAndProxyListener(MembershipListener listener) { |
| regionAdvisor.removeMembershipListener(listener); |
| super.removeMembershipAndProxyListener(listener); |
| } |
| |
| /** |
| * Called from endBucket creation. We send out a profile to notify others that the persistence is |
| * initialized. |
| */ |
| void endBucketCreation() { |
| sendProfileUpdate(); |
| } |
| |
| /** |
| * Profile information for a remote bucket counterpart. |
| */ |
| public static class BucketProfile extends CacheProfile { |
| /** True if this profile's member is attempting to initialize the bucket */ |
| public boolean isInitializing; |
| /** True if this profile's member is the primary for this bucket */ |
| public boolean isPrimary; |
| /** |
| * True if the profile is coming from a real BucketRegion acceptible states hosting = false, |
| * init = true hosting = true, init = false hosting = false, init = false unacceptible states |
| * hosting = true, init = true |
| */ |
| public boolean isHosting; |
| |
| |
| /** |
| * True if the bucket has been removed from this host. ( |
| */ |
| public boolean removed; |
| |
| public BucketProfile() {} |
| |
| public BucketProfile(InternalDistributedMember memberId, int version, Bucket bucket) { |
| super(memberId, version); |
| isPrimary = bucket.isPrimary(); |
| isHosting = bucket.isHosting(); |
| } |
| |
| @Override |
| public StringBuilder getToStringHeader() { |
| return new StringBuilder("BucketAdvisor.BucketProfile"); |
| } |
| |
| @Override |
| public void fillInToString(StringBuilder sb) { |
| super.fillInToString(sb); |
| sb.append("; isPrimary=").append(isPrimary); |
| sb.append("; isHosting=").append(isHosting); |
| sb.append("; isInitializing=").append(isInitializing); |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| isPrimary = in.readBoolean(); |
| isHosting = in.readBoolean(); |
| isInitializing = in.readBoolean(); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| out.writeBoolean(isPrimary); |
| out.writeBoolean(isHosting); |
| out.writeBoolean(isInitializing); |
| } |
| |
| @Override |
| public int getDSFID() { |
| return BUCKET_PROFILE; |
| } |
| } |
| /** |
| * Profile information for a remote bucket hosted by cache servers. |
| */ |
| public static class ServerBucketProfile extends BucketProfile { |
| |
| private Set<BucketServerLocation66> bucketServerLocations; |
| |
| private int bucketId; |
| |
| public ServerBucketProfile() {} |
| |
| public ServerBucketProfile(InternalDistributedMember memberId, int version, Bucket bucket, |
| HashSet<BucketServerLocation66> serverLocations) { |
| super(memberId, version, bucket); |
| bucketId = bucket.getId(); |
| bucketServerLocations = serverLocations; |
| } |
| |
| @Override |
| public StringBuilder getToStringHeader() { |
| return new StringBuilder("BucketAdvisor.ServerBucketProfile"); |
| } |
| |
| @Override |
| public void fillInToString(StringBuilder sb) { |
| super.fillInToString(sb); |
| for (BucketServerLocation66 location : bucketServerLocations) { |
| sb.append("; hostName=").append(location.getHostName()); |
| sb.append("; port=").append(location.getPort()); |
| } |
| } |
| |
| @Override |
| public void fromData(DataInput in, |
| DeserializationContext context) throws IOException, ClassNotFoundException { |
| super.fromData(in, context); |
| bucketServerLocations = SerializationHelper.readBucketServerLocationSet(in); |
| bucketId = DataSerializer.readPrimitiveInt(in); |
| } |
| |
| @Override |
| public void toData(DataOutput out, |
| SerializationContext context) throws IOException { |
| super.toData(out, context); |
| SerializationHelper.writeBucketServerLocationSet(bucketServerLocations, out); |
| DataSerializer.writePrimitiveInt(bucketId, out); |
| } |
| |
| public Set<BucketServerLocation66> getBucketServerLocations() { |
| return bucketServerLocations; |
| } |
| |
| @Override |
| public int getDSFID() { |
| return SERVER_BUCKET_PROFILE; |
| } |
| |
| @Override |
| public int hashCode() { |
| BucketServerLocation66 sl = (BucketServerLocation66) bucketServerLocations.toArray()[0]; |
| return 31 * bucketId + sl.getPort(); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (this == obj) |
| return true; |
| if (obj == null) |
| return false; |
| if (!(obj instanceof ServerBucketProfile)) |
| return false; |
| final ServerBucketProfile other = (ServerBucketProfile) obj; |
| if (other.bucketId != bucketId) { |
| return false; |
| } |
| if (other.bucketServerLocations.size() != bucketServerLocations.size()) { |
| return false; |
| } |
| return other.bucketServerLocations.containsAll(bucketServerLocations); |
| } |
| } |
| |
| /** |
| * Handles the actual volunteering to become primary bucket. Ensures that only one thread is ever |
| * volunteering at one time. |
| * |
| */ |
| class VolunteeringDelegate { |
| /** |
| * Reference to the Thread that is currently volunteering. Protected by |
| * synchronized(volunteeringLock). |
| */ |
| private Thread volunteeringThread; |
| |
| private boolean aggressive = false; |
| |
| /** |
| * Returns true if this delegate is aggressively trying to become the primary even if another |
| * member is already the primary. |
| * |
| * @return true if this aggressively trying to become the primary |
| */ |
| boolean isAggressive() { |
| synchronized (BucketAdvisor.this) { |
| return aggressive; |
| } |
| } |
| |
| /** |
| * Initiates volunteering for primary. Repeated calls are harmless. Invoked by the |
| * BucketAdvisor. Caller must be synchronized on BucketAdvisor. |
| */ |
| void volunteerForPrimary() { |
| boolean handedOff = false; |
| while (!handedOff) { |
| getAdvisee().getCancelCriterion().checkCancelInProgress(null); |
| boolean interrupted = Thread.interrupted(); |
| try { |
| execute(this::doVolunteerForPrimary); |
| handedOff = true; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| getAdvisee().getCancelCriterion().checkCancelInProgress(e); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Reserves this delegate for the current thread to call becomePrimary. Necessary because caller |
| * of doVolunteerForPrimary must not be synchronized on BucketAdvisor. |
| * |
| * @return true if successfully reserved for becomePrimary |
| */ |
| boolean reserveForBecomePrimary() { |
| synchronized (BucketAdvisor.this) { |
| if (volunteeringThread != null) { |
| return false; |
| } |
| aggressive = true; |
| return true; |
| } |
| } |
| |
| /** |
| * Invoked by the thread that performs the actual volunteering work. |
| */ |
| void doVolunteerForPrimary() { |
| if (!beginVolunteering()) { |
| return; |
| } |
| boolean dlsDestroyed = false; |
| try { |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Begin volunteerForPrimary for {}", BucketAdvisor.this); |
| } |
| DistributedMemberLock thePrimaryLock = null; |
| while (continueVolunteering()) { |
| // Fix for 41865 - We can't send out profiles while holding the |
| // sync on this advisor, because that will cause a deadlock. |
| // Holding the primaryMoveWriteLock here instead prevents any |
| // operations from being performed on this primary until the child regions |
| // are synced up. It also prevents a depose from happening until then. |
| BucketAdvisor parentBA = parentAdvisor; |
| primaryMoveWriteLock.lock(); |
| try { |
| boolean acquiredLock; |
| getAdvisee().getCancelCriterion().checkCancelInProgress(null); |
| // Check our parent advisor and set our state |
| // accordingly |
| if (parentBA != null) { |
| // Fix for 44350 - we don't want to get a primary move lock on |
| // the advisor, because that might deadlock with a user thread. |
| // However, since all depose/elect operations on the parent bucket |
| // cascade to the child bucket and get the child bucket move lock, |
| // if should be safe to check this without the lock here. |
| if (parentBA.isPrimary() && !isPrimary()) { |
| acquiredLock = acquiredPrimaryLock(); |
| } else { |
| return; |
| } |
| } else { |
| // we're not colocated, need to get the dlock |
| assignStartingBucketAdvisorIfFixedPartitioned(); |
| if (startingBucketAdvisor != null) { |
| Assert.assertHoldsLock(this, false); |
| synchronized (startingBucketAdvisor) { |
| if (startingBucketAdvisor.isPrimary() && !isPrimary()) { |
| acquiredLock = acquiredPrimaryLock(); |
| } else { |
| return; |
| } |
| } |
| } else { |
| if (thePrimaryLock == null) { |
| thePrimaryLock = getPrimaryLock(true); |
| if (thePrimaryLock == null) { |
| // InternalDistributedSystem.isDisconnecting probably |
| // prevented us from |
| // creating the DLS... hope there's a thread closing this |
| // advisor but |
| // it's probably not safe to assert that it already happened |
| return; |
| } |
| } |
| Assert.assertTrue(!thePrimaryLock.holdsLock()); |
| if (isAggressive()) { |
| acquiredLock = thePrimaryLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS); |
| } else { |
| acquiredLock = thePrimaryLock.tryLock(); |
| } |
| if (acquiredLock) { |
| acquiredLock = acquiredPrimaryLock(); |
| } |
| } // parentAdvisor == null |
| } |
| if (acquiredLock) { |
| // if the lock has been acquired then try to do the same for colocated PR's too |
| // Here if somehow a bucket can't acquire a lock |
| // we assume that it is in the process of becoming primary through |
| // BucketAdvisor.volunteerForPrimary() |
| // Either way we have to guarantee that the bucket becomes primary for sure.(How to |
| // guarantee?) |
| acquirePrimaryRecursivelyForColocated(); |
| acquirePrimaryForRestOfTheBucket(); |
| return; |
| } |
| } finally { |
| primaryMoveWriteLock.unlock(); |
| } |
| // else: acquiredPrimaryLock released thePrimaryLock |
| |
| if (!continueVolunteering()) { |
| // this avoids calling the wait below... |
| return; |
| } |
| |
| waitIfNoPrimaryMemberFound(); |
| } // while |
| } catch (LockServiceDestroyedException e) { |
| dlsDestroyed = true; |
| handleException(e, true); |
| } catch (RegionDestroyedException | CancelException e) { |
| handleException(e, false); |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| handleException(e, false); |
| } finally { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Exit volunteerForPrimary for {}; dlsDestroyed={}", BucketAdvisor.this, |
| dlsDestroyed); |
| } |
| endVolunteering(); |
| // if (isPrimary()) { |
| // Bucket bucket = getBucket(); |
| // if (bucket instanceof ProxyBucketRegion) { |
| // bucket = ((ProxyBucketRegion)bucket).getHostedBucketRegion(); |
| // } |
| // } |
| } |
| } |
| |
| /** |
| * Called from catch blocks in {@link #doVolunteerForPrimary()}. Handles the exception properly |
| * based on advisor settings and shutdown condition. |
| * |
| * @param e the RuntimeException that was caught while volunteering |
| * @param loggit true if message should be logged if shutdown condition is not met |
| */ |
| private void handleException(Exception e, boolean loggit) { |
| boolean safe = isClosed() || getAdvisee().getCancelCriterion().isCancelInProgress(); |
| if (!safe) { |
| if (ENFORCE_SAFE_CLOSE) { |
| Assert.assertTrue(safe, |
| "BucketAdvisor was not closed properly."); |
| } else if (loggit) { |
| logger.warn("BucketAdvisor was not closed properly.", e); |
| } |
| } |
| } |
| |
| private boolean beginVolunteering() { |
| synchronized (BucketAdvisor.this) { |
| if (Thread.currentThread().equals(volunteeringThread)) { |
| return true; // this thread is already volunteering or reserved |
| } |
| if (volunteeringThread != null) { |
| // another thread is already volunteering |
| return false; |
| } |
| volunteeringThread = Thread.currentThread(); |
| boolean changedState = false; |
| try { |
| if (isAggressive()) { |
| changedState = setBecoming(); |
| } else { |
| changedState = setVolunteering(); |
| } |
| return changedState; |
| } finally { |
| if (!changedState) { |
| aggressive = false; |
| volunteeringThread = null; |
| } |
| } |
| } |
| } |
| |
| private boolean continueVolunteering() { |
| synchronized (BucketAdvisor.this) { |
| // false if caller is not the volunteeringThread |
| if (!Thread.currentThread().equals(volunteeringThread)) { |
| return false; |
| } |
| if (!isVolunteering() && !isBecomingPrimary()) { |
| return false; |
| } |
| |
| // false if primaryMember is not null |
| if (!isAggressive() && basicGetPrimaryMember() != null) { |
| return false; |
| } |
| // false if this member is already primary |
| if (isPrimary()) { |
| return false; |
| } |
| // false if closed |
| if (isClosed()) { |
| return false; |
| } |
| // false if no longer hosting |
| return isHosting(); |
| |
| // must be true... need to continue volunteering |
| } |
| } |
| |
| private void endVolunteering() { |
| if (Thread.currentThread().equals(volunteeringThread)) { |
| volunteeringThread = null; |
| aggressive = false; |
| } |
| } |
| |
| private void waitIfNoPrimaryMemberFound() { |
| synchronized (BucketAdvisor.this) { |
| if (basicGetPrimaryMember() == null) { |
| waitForPrimaryMember(100); |
| if (basicGetPrimaryMember() == null) { |
| findAndSetPrimaryMember(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Executes the primary volunteering task after queuing it in the |
| * {@link BucketAdvisor#getVolunteeringQueue()}. A number of threads equal to |
| * {@link RegionAdvisor#VOLUNTEERING_THREAD_COUNT} are permitted to consume from the queue by |
| * acquiring permits from {@link BucketAdvisor#getVolunteeringSemaphore()}. |
| * |
| * @param volunteeringTask the task to queue and then execute in waiting thread pool |
| * |
| */ |
| private void execute(Runnable volunteeringTask) throws InterruptedException { |
| // @todo: instead of having a semaphore and queue on RegionAdvisor |
| // we should have an executor which limits its max threads to |
| // VOLUNTEERING_THREAD_COUNT. |
| if (Thread.interrupted()) |
| throw new InterruptedException(); |
| Queue<Runnable> volunteeringQueue = getVolunteeringQueue(); |
| synchronized (volunteeringQueue) { |
| // add the volunteering task |
| volunteeringQueue.add(volunteeringTask); |
| if (getVolunteeringSemaphore().tryAcquire()) { |
| // ensure there is a thread consuming the queue |
| boolean handedOff = false; |
| try { |
| getDistributionManager().getExecutors().getWaitingThreadPool().execute(consumeQueue()); |
| handedOff = true; |
| } finally { |
| if (!handedOff) { |
| getVolunteeringSemaphore().release(); |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Returns the runnable used to consume the volunteering queue. The executing thread(s) will |
| * consume from the queue until it is empty. |
| * |
| * @return runnable for consuming the volunteering queue |
| */ |
| private Runnable consumeQueue() { |
| return () -> { |
| getPartitionedRegionStats().incVolunteeringThreads(1); |
| boolean releaseSemaphore = true; |
| try { |
| Queue<Runnable> volunteeringQueue = getVolunteeringQueue(); |
| Runnable queuedWork; |
| while (true) { |
| // SystemFailure.checkFailure(); |
| getAdvisee().getCancelCriterion().checkCancelInProgress(null); |
| synchronized (volunteeringQueue) { |
| // synchronized volunteeringQueue for coordination between threads adding |
| // work to the queue and checking for a consuming thread and the existing |
| // consuming thread to determine if it can exit since the queue is empty. |
| queuedWork = volunteeringQueue.poll(); |
| if (queuedWork == null) { |
| // the queue is empty... no more work... so return |
| // @todo why release the semaphore here are sync'ed? |
| // we could just let the finally block do it. |
| getVolunteeringSemaphore().release(); |
| releaseSemaphore = false; |
| return; |
| } |
| // still more work in the queue so let's run it |
| } |
| try { |
| queuedWork.run(); |
| } catch (CancelException e) { |
| return; |
| } catch (RuntimeException e) { |
| // log and continue consuming queue |
| logger.error(e.getMessage(), e); |
| } |
| } |
| } finally { |
| getPartitionedRegionStats().incVolunteeringThreads(-1); |
| if (releaseSemaphore) { |
| // Clean up, just in case |
| getVolunteeringSemaphore().release(); |
| } |
| } |
| }; |
| } |
| } |
| |
| void markAllShadowBucketsAsNonDestroyed() { |
| destroyedShadowBuckets.clear(); |
| } |
| |
| void markAllShadowBucketsAsDestroyed() { |
| destroyedShadowBuckets.forEach((k, v) -> destroyedShadowBuckets.put(k, true)); |
| } |
| |
| void markShadowBucketAsDestroyed(String shadowBucketPath) { |
| destroyedShadowBuckets.put(shadowBucketPath, true); |
| } |
| |
| public boolean isShadowBucketDestroyed(String shadowBucketPath) { |
| return destroyedShadowBuckets.getOrDefault(shadowBucketPath, false); |
| } |
| } |