blob: 3a55af97341c3ab69d31ce79a0dd52dbc2014a76 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import static org.apache.geode.internal.cache.CacheServerImpl.CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.AbstractSet;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelException;
import org.apache.geode.DataSerializer;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.client.internal.locator.SerializationHelper;
import org.apache.geode.cache.partition.PartitionListener;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.DistributedSystemDisconnectedException;
import org.apache.geode.distributed.LockNotHeldException;
import org.apache.geode.distributed.LockServiceDestroyedException;
import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.locks.DLockService;
import org.apache.geode.distributed.internal.locks.DistributedMemberLock;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.BucketProfileUpdateMessage;
import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage;
import org.apache.geode.internal.cache.partitioned.DeposePrimaryBucketMessage.DeposePrimaryBucketResponse;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.serialization.DeserializationContext;
import org.apache.geode.internal.serialization.SerializationContext;
import org.apache.geode.internal.util.StopWatch;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* Specialized {@link CacheDistributionAdvisor} for {@link BucketRegion BucketRegions}. The
* <code>BucketAdvisor</code> is owned by a {@link ProxyBucketRegion} and may outlive a
* <code>BucketRegion</code>.
*/
@SuppressWarnings("synthetic-access")
public class BucketAdvisor extends CacheDistributionAdvisor {
private static final Logger logger = LogService.getLogger();
private static final boolean ENFORCE_SAFE_CLOSE = false;
// TODO: Boolean.getBoolean("gemfire.BucketAdvisor.debug.enforceSafeClose");
/** Reference to the InternalDistributedMember that is primary. */
private final AtomicReference<InternalDistributedMember> primaryMember = new AtomicReference<>();
/**
* Advice requests for {@link #adviseProfileUpdate()} delegate to the partitioned region's
* <code>RegionAdvisor</code> to include members with {@link ProxyBucketRegion}s as well as real
* {@link BucketRegion}s.
*/
private final RegionAdvisor regionAdvisor;
private final BucketRedundancyTracker redundancyTracker;
/**
* The bucket primary will be holding this distributed lock. Protected by synchronized(this).
*/
private DistributedMemberLock primaryLock;
// private static final byte MASK_HOSTING = 1; // 0001
// private static final byte MASK_VOLUNTEERING = 2; // 0010
// private static final byte MASK_OTHER_PRIMARY = 4; // 0100
// private static final byte MASK_IS_PRIMARY = 8; // 1000
private static final byte NO_PRIMARY_NOT_HOSTING = 0; // 0000_0000
private static final byte NO_PRIMARY_HOSTING = 1; // 0000_0001
private static final byte OTHER_PRIMARY_NOT_HOSTING = 4; // 0000_0100
private static final byte OTHER_PRIMARY_HOSTING = 5; // 0000_0101
private static final byte VOLUNTEERING_HOSTING = 3; // 0000_0011
private static final byte BECOMING_HOSTING = 15; // 0000_1111
private static final byte IS_PRIMARY_HOSTING = 9; // 0000_1001
private static final byte CLOSED = 16; // 0001_0000
/**
* The current state of this BucketAdvisor which tracks which member is primary and whether or not
* this member is hosting a real Bucket.
*/
private volatile byte primaryState = NO_PRIMARY_NOT_HOSTING;
/**
* This delegate handles all volunteering for primary status. Lazily created. Protected by
* synchronization(this).
*/
private VolunteeringDelegate volunteeringDelegate;
/**
* A random number generator
*
* @see #getPreferredNode()
*/
@Immutable
private static final Random myRand = new Random();
/**
* A read/write lock to prevent making this bucket not primary while a write is in progress on the
* bucket.
*/
private final ReadWriteLock primaryMoveReadWriteLock = new ReentrantReadWriteLock();
private final Lock primaryMoveReadLock = primaryMoveReadWriteLock.readLock();
private final Lock primaryMoveWriteLock = primaryMoveReadWriteLock.writeLock();
/**
* The advisor for the bucket region that we are colocated with, if this region is a colocated
* region.
*/
private BucketAdvisor parentAdvisor;
/**
* The member that is responsible for choosing the primary for this bucket. While this field is
* set and this member exists, this bucket won't try to become primary.
*/
private volatile InternalDistributedMember primaryElector;
private volatile BucketProfile localProfile;
private volatile boolean everHadPrimary = false;
private BucketAdvisor startingBucketAdvisor;
private PartitionedRegion pRegion;
private volatile boolean shadowBucketDestroyed;
/**
* Constructs a new BucketAdvisor for the Bucket owned by RegionAdvisor.
*
* @param bucket the bucket to provide metadata and advice for
* @param regionAdvisor advisor for the PartitionedRegion
*/
private BucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) {
super(bucket);
this.regionAdvisor = regionAdvisor;
pRegion = this.regionAdvisor.getPartitionedRegion();
redundancyTracker =
new BucketRedundancyTracker(pRegion.getRedundantCopies(), pRegion.getRedundancyTracker());
resetParentAdvisor(bucket.getId());
}
public static BucketAdvisor createBucketAdvisor(Bucket bucket, RegionAdvisor regionAdvisor) {
BucketAdvisor advisor = new BucketAdvisor(bucket, regionAdvisor);
advisor.initialize();
return advisor;
}
private void resetParentAdvisor(int bucketId) {
PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(pRegion);
if (colocatedRegion != null) {
if (colocatedRegion.isFixedPartitionedRegion()) {
List<FixedPartitionAttributesImpl> fpas = colocatedRegion.getFixedPartitionAttributesImpl();
if (fpas != null) {
for (FixedPartitionAttributesImpl fpa : fpas) {
if (fpa.hasBucket(bucketId)) {
parentAdvisor =
colocatedRegion.getRegionAdvisor().getBucketAdvisor(fpa.getStartingBucketID());
break;
}
}
}
} else {
parentAdvisor = colocatedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
}
} else {
parentAdvisor = null;
}
}
private void assignStartingBucketAdvisorIfFixedPartitioned() {
if (startingBucketAdvisor != null) {
// already assigned
return;
}
if (pRegion.isFixedPartitionedRegion()) {
List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl();
if (fpas != null) {
int bucketId = getBucket().getId();
for (FixedPartitionAttributesImpl fpa : fpas) {
if (fpa.hasBucket(bucketId) && bucketId != fpa.getStartingBucketID()) {
startingBucketAdvisor = regionAdvisor.getBucketAdvisor(fpa.getStartingBucketID());
break;
}
}
}
}
}
/**
* Returns the lock that prevents the primary from moving while active writes are in progress.
* This should be locked before checking if the local bucket is primary.
*
* @return the lock for in-progress write operations
*/
public Lock getPrimaryMoveReadLock() {
return primaryMoveReadLock;
}
/**
* Returns the lock that prevents the parent's primary from moving while active writes are in
* progress. This should be locked before checking if the local bucket is primary.
*
* @return the lock for in-progress write operations
*/
Lock getParentPrimaryMoveReadLock() {
if (parentAdvisor != null) {
return parentAdvisor.getPrimaryMoveReadLock();
}
return null;
}
/**
* Try to lock the primary bucket to make sure no operation is on-going at current bucket.
*
*/
void tryLockIfPrimary() {
if (isPrimary()) {
try {
primaryMoveWriteLock.lock();
} finally {
primaryMoveWriteLock.unlock();
}
}
}
/**
* Makes this <code>BucketAdvisor</code> give up being a primary and become a secondary. Does
* nothing if not currently the primary.
*
* @return true if this advisor has been deposed as primary
*/
public boolean deposePrimary() {
if (isPrimary()) {
primaryMoveWriteLock.lock();
boolean needToSendProfileUpdate = false;
try {
removePrimary(getDistributionManager().getId());
synchronized (this) {
if (!isPrimary()) {
// releasePrimaryLock();
needToSendProfileUpdate = true;
return true;
} else {
return false; // failed for some reason
}
}
} finally {
primaryMoveWriteLock.unlock();
if (needToSendProfileUpdate) {
sendProfileUpdate();
}
}
} else {
sendProfileUpdate();
return true;
}
}
/**
* This calls deposePrimary on every colocated child that is directly colocated to this bucket's
* PR. Those each in turn do the same to their child buckets and so on before returning. Each
* depose will send a dlock release message to the grantor, wait for reply, and then also send a
* profile update.
* <p>
* Caller must synchronize on this BucketAdvisor.
*
*/
private void deposePrimaryForColocatedChildren() {
boolean deposedChildPrimaries = true;
List<PartitionedRegion> colocatedChildPRs = ColocationHelper.getColocatedChildRegions(pRegion);
if (colocatedChildPRs != null) {
for (PartitionedRegion pr : colocatedChildPRs) {
Bucket b = pr.getRegionAdvisor().getBucket(getBucket().getId());
if (b != null) {
BucketAdvisor ba = b.getBucketAdvisor();
deposedChildPrimaries = ba.deposePrimary() && deposedChildPrimaries;
if (b instanceof BucketRegionQueue) {
BucketRegionQueue brq = (BucketRegionQueue) b;
brq.decQueueSize(brq.size());
brq.incSecondaryQueueSize(brq.size());
}
}
}
}
}
private void deposeOtherPrimaryBucketForFixedPartition() {
boolean deposedOtherPrimaries = true;
int bucketId = getBucket().getId();
List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl();
if (fpas != null) {
for (FixedPartitionAttributesImpl fpa : fpas) {
if (fpa.getStartingBucketID() == bucketId) {
for (int i = (bucketId + 1); i <= fpa.getLastBucketID(); i++) {
Bucket b = regionAdvisor.getBucket(i);
if (b != null) {
BucketAdvisor ba = b.getBucketAdvisor();
deposedOtherPrimaries = ba.deposePrimary() && deposedOtherPrimaries;
}
}
}
}
}
}
void removeBucket() {
setHosting(false);
}
/**
* Return (and possibly choose) a thread-sticky member from whose data store this bucket's values
* should be read
*
* @return member to use for reads, null if none available
*/
public InternalDistributedMember getPreferredNode() {
if (isHosting()) {
getPartitionedRegionStats().incPreferredReadLocal();
return getDistributionManager().getId();
}
Profile[] locProfiles = profiles; // volatile read
if (locProfiles.length == 0) {
return null;
}
Profile selectedProfile = selectNotInitializingProfile(locProfiles);
if (selectedProfile == null) {
return null;
}
getPartitionedRegionStats().incPreferredReadRemote();
return selectedProfile.peerMemberId;
}
/**
* Get the random Profile that is not initializing BucketProfile.
*
*/
private Profile selectNotInitializingProfile(Profile[] inProfiles) {
int index = 0;
int offset = 0;
if (inProfiles.length > 1) {
// Pick random offset.
offset = myRand.nextInt(inProfiles.length);
}
for (int i = 0; i < inProfiles.length; i++) {
index = (offset + i) % inProfiles.length;
BucketProfile bp = (BucketProfile) inProfiles[index];
if (!bp.isInitializing) {
return inProfiles[index];
}
}
return null;
}
/**
* Returns the thread-safe queue of primary volunteering tasks for the parent Partitioned Region.
*
* @return the queue of primary volunteering tasks
*/
private Queue<Runnable> getVolunteeringQueue() {
return regionAdvisor.getVolunteeringQueue();
}
/**
* Returns the semaphore which controls the number of threads allowed to consume from the
* {@link #getVolunteeringQueue volunteering queue}.
*
* @return the semaphore which controls the number of volunteering threads
*/
private Semaphore getVolunteeringSemaphore() {
return regionAdvisor.getVolunteeringSemaphore();
}
/**
* Returns the PartitionedRegionStats.
*
* @return the PartitionedRegionStats
*/
private PartitionedRegionStats getPartitionedRegionStats() {
return regionAdvisor.getPartitionedRegionStats();
}
/**
* Concurrency: protected by synchronizing on *this*
*/
@Override
protected void profileCreated(Profile profile) {
regionAdvisor.incrementBucketCount(profile);
super.profileCreated(profile);
if (updateRedundancy() > 0) {
// wake up any threads in waitForRedundancy or waitForPrimary
notifyAll();
}
regionAdvisor.updateBucketStatus(getBucket().getId(), profile.peerMemberId, false);
if (logger.isDebugEnabled()) {
logger.debug("Profile added {} Profile : {}", getBucket().getFullPath(), profile);
}
synchronized (this) {
updateServerBucketProfile();
}
}
/**
* Concurrency: protected by synchronizing on *this*
*/
@Override
protected void profileUpdated(Profile profile) {
super.profileUpdated(profile);
if (updateRedundancy() > 0) {
// wake up any threads in waitForRedundancy or waitForPrimary
notifyAll();
}
regionAdvisor.updateBucketStatus(getBucket().getId(), profile.peerMemberId, false);
if (logger.isDebugEnabled()) {
logger.debug("Profile updated {} Profile : {}", getBucket().getFullPath(), profile);
}
synchronized (this) {
updateServerBucketProfile();
}
}
/**
* Concurrency: protected by synchronizing on *this*
*/
@Override
protected void profileRemoved(Profile profile) {
if (profile != null) {
regionAdvisor.updateBucketStatus(getBucket().getId(),
profile.getDistributedMember(), true);
regionAdvisor.decrementsBucketCount(profile);
}
updateRedundancy();
if (logger.isDebugEnabled()) {
logger.debug("Profile removed {} the member lost {} Profile : {}", getBucket().getFullPath(),
profile != null ? profile.getDistributedMember() : null, profile);
}
synchronized (this) {
updateServerBucketProfile();
}
}
@Override
public boolean shouldSyncForCrashedMember(InternalDistributedMember id) {
BucketProfile profile = (BucketProfile) getProfile(id);
return (profile != null) && (profile.isPrimary);
}
@Override
public DistributedRegion getRegionForDeltaGII() {
DistributedRegion result = super.getRegionForDeltaGII();
if (result == null && getAdvisee() instanceof ProxyBucketRegion) {
result = ((ProxyBucketRegion) getAdvisee()).getHostedBucketRegion();
}
return result;
}
/**
* Called by the RegionAdvisor.profileRemoved, this method tests to see if the missing member is
* the primary elector for this bucket.
*
* We can't call this method from BucketAdvisor.profileRemoved, because the primaryElector may not
* actually host the bucket.
*
*/
public void checkForLostPrimaryElector(Profile profile) {
// If the member that went away was in the middle of creating
// the bucket, finish the bucket creation.
ProfileId elector = primaryElector;
if (elector != null && elector.equals(profile.getDistributedMember())) {
if (logger.isDebugEnabled()) {
logger.debug(
"Bucket {} lost the member responsible for electing the primary. Finishing bucket creation",
getBucket().getFullPath());
}
primaryElector = getBucket().getDistributionManager().getId();
getBucket().getDistributionManager().getExecutors().getWaitingThreadPool().execute(
() -> getBucket().getPartitionedRegion().getRedundancyProvider()
.finishIncompleteBucketCreation(getBucket().getId()));
}
}
/**
* Only allows profiles that actually hosting this bucket. If the profile is primary, then
* primaryMember will be set to that member but only if we are not already the primary.
*
* @param profile the profile to add (must be a BucketProfile)
* @param forceProfile true will force profile to be added even if member is not in distributed
* view
*
* @see #adviseProfileUpdate()
*/
@Override
public boolean putProfile(Profile profile, boolean forceProfile) {
assert profile instanceof BucketProfile;
BucketProfile bp = (BucketProfile) profile;
// Only hosting buckets will be initializing, the isInitializing boolean is to
// allow for early entry into the advisor for GII purposes
if (!bp.isHosting && !bp.isInitializing) {
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE, "BucketAdvisor#putProfile early out");
}
return false; // Do not allow introduction of proxy profiles, they don't provide anything
// useful
// isHosting = false, isInitializing = false
}
if (logger.isTraceEnabled(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE)) {
logger.trace(LogMarker.DISTRIBUTION_ADVISOR_VERBOSE,
"BucketAdvisor#putProfile profile=<{}> force={}; profile = {}", profile, forceProfile,
bp);
}
// isHosting = false, isInitializing = true
// isHosting = true, isInitializing = false
// isHosting = true, isInitializing = true... (false state)
final boolean applied;
synchronized (this) {
// force new membership version in the advisor so that the
// state flush mechanism can capture any updates to the bucket
// MIN_VALUE is intended as a somewhat unique value for potential debug purposes
profile.initialMembershipVersion = Long.MIN_VALUE;
applied = super.putProfile(profile, forceProfile);
// skip following block if isPrimary to avoid race where we process late
// arriving OTHER_PRIMARY profile after we've already become primary
if (applied && !isPrimary()) {
if (bp.isPrimary) {
setPrimaryMember(bp.getDistributedMember());
} else {
notPrimary(bp.getDistributedMember());
}
} // if: !isPrimary
} // synchronized
return applied;
}
private static <E> Set<E> newSetFromMap(Map<E, Boolean> map) {
if (map.isEmpty()) {
return new SetFromMap<>(map);
}
throw new IllegalArgumentException();
}
@SuppressWarnings("NullableProblems")
private static class SetFromMap<E> extends AbstractSet<E> implements Serializable {
private static final long serialVersionUID = 2454657854757543876L;
// must named as it, to pass serialization compatibility test.
private Map<E, Boolean> m;
private transient Set<E> backingSet;
SetFromMap(final Map<E, Boolean> map) {
super();
m = map;
backingSet = map.keySet();
}
@Override
public boolean equals(Object object) {
return backingSet.equals(object);
}
@Override
public int hashCode() {
return backingSet.hashCode();
}
@Override
public boolean add(E object) {
return m.put(object, Boolean.TRUE) == null;
}
@Override
public void clear() {
m.clear();
}
@Override
public String toString() {
return backingSet.toString();
}
@Override
public boolean contains(Object object) {
return backingSet.contains(object);
}
@Override
public boolean containsAll(Collection<?> collection) {
return backingSet.containsAll(collection);
}
@Override
public boolean isEmpty() {
return m.isEmpty();
}
@Override
public boolean remove(Object object) {
return m.remove(object) != null;
}
@Override
public boolean retainAll(Collection<?> collection) {
return backingSet.retainAll(collection);
}
@Override
public Object[] toArray() {
return backingSet.toArray();
}
@Override
public <T> T[] toArray(T[] contents) {
return backingSet.toArray(contents);
}
@Override
public Iterator<E> iterator() {
return backingSet.iterator();
}
@Override
public int size() {
return m.size();
}
private void readObject(ObjectInputStream stream) throws IOException, ClassNotFoundException {
stream.defaultReadObject();
backingSet = m == null ? Collections.emptySet() : m.keySet();
}
}
/**
* repopulates the RegionAdvisor's location information for this bucket
*/
private void updateServerBucketProfile() {
int bucketId = getBucket().getId();
Set<ServerBucketProfile> serverProfiles =
newSetFromMap(new HashMap<>());
for (Profile p : profiles) {
if (p instanceof ServerBucketProfile) {
serverProfiles.add((ServerBucketProfile) p);
}
}
regionAdvisor.setClientBucketProfiles(bucketId, serverProfiles);
}
/**
* Only for local profile.
*
*/
public synchronized void updateServerBucketProfile(BucketProfile p) {
localProfile = p;
}
public BucketProfile getLocalProfile() {
return localProfile;
}
@Override
public boolean removeId(ProfileId memberId, boolean crashed, boolean destroyed,
boolean fromMembershipListener) {
boolean hadBucketRegion = super.removeId(memberId, crashed, destroyed, fromMembershipListener);
if (hadBucketRegion) {
// do NOT call notPrimary under synchronization
try {
notPrimary((InternalDistributedMember) memberId);
} catch (CancelException e) {
// must be closing the cache - no need to try to become primary
}
}
return hadBucketRegion;
}
/**
* Removes the profile for the specified member. If that profile is marked as primary, this will
* call {@link #notPrimary(InternalDistributedMember)}.
*
* @param memberId the member to remove the profile for
* @param serialNum specific serial number to remove
* @return true if a matching profile for the member was found
*/
@Override
public boolean removeIdWithSerial(InternalDistributedMember memberId, int serialNum,
boolean regionDestroyed) {
boolean hadBucketRegion = super.removeIdWithSerial(memberId, serialNum, regionDestroyed);
if (hadBucketRegion) {
// do NOT call notPrimary under synchronization
notPrimary(memberId);
}
forceNewMembershipVersion();
return hadBucketRegion;
}
@Override
public Set<InternalDistributedMember> adviseProfileExchange() {
// delegate up to RegionAdvisor to include members that might have
// ProxyBucketRegion without a real BucketRegion
Assert.assertTrue(regionAdvisor.isInitialized());
return regionAdvisor.adviseBucketProfileExchange();
}
@Override
public Set<InternalDistributedMember> adviseProfileUpdate() {
// delegate up to RegionAdvisor to include members that might have
// ProxyBucketRegion without a real BucketRegion
return regionAdvisor.adviseGeneric();
}
/**
* Sets hosting to false and returns without closing. Calling closeAdvisor will actually close
* this advisor.
*/
@Override
public void close() {
// ok, so BucketRegion extends DistributedRegion which calls close on the
// advisor, BUT the ProxyBucketRegion truly owns the advisor and only it
// should be able to close the advisor...
//
// if a BucketRegion closes, it will call this method and we want to change
// our state to NOT_HOSTING
//
// see this.closeAdvisor()
setHosting(false);
}
/**
* Blocks until there is a known primary and return that member, but only if there are real bucket
* regions that exist. If there are no real bucket regions within the distribution config's
* member-timeout setting * 3 (time required to eject a member) + 15000, then this returns null.
*
* kbanks: reworked this method to avoid JIT issue #40639
*
* @return the member who is primary for this bucket
*/
public InternalDistributedMember getPrimary() {
InternalDistributedMember primary = getExistingPrimary();
if (primary == null) {
primary = waitForNewPrimary();
}
return primary;
}
/**
* This method was split out from getPrimary() due to bug #40639 and is only intended to be called
* from within that method.
*
* @see #getPrimary()
* @return the existing primary (if it is still in the view) otherwise null
*/
private InternalDistributedMember getExistingPrimary() {
return basicGetPrimaryMember();
}
/**
* This method was split out from getPrimary() due to bug #40639 and is only intended to be called
* from within that method.
*
* @see #getPrimary()
* @return the new primary
*/
private InternalDistributedMember waitForNewPrimary() {
DistributionManager dm = regionAdvisor.getDistributionManager();
DistributionConfig config = dm.getConfig();
// failure detection period
long timeout = config.getMemberTimeout() * 3L;
// plus time for a new member to become primary
timeout += Long.getLong(DistributionConfig.GEMFIRE_PREFIX + "BucketAdvisor.getPrimaryTimeout",
15000L);
return waitForPrimaryMember(timeout);
}
/**
* Marks member as not primary. Initiates volunteerForPrimary if this member is hosting a real
* bucket. This method does nothing if the member parameter is the current member.
*
* @param member the member who is not primary
*/
public void notPrimary(InternalDistributedMember member) {
// Fix for 43569. Only the deposePrimary call should
// make the local member drop the primary lock.
if (!member.equals(getDistributionManager().getId())) {
removePrimary(member);
}
}
/**
* Marks member as not primary. Initiates volunteerForPrimary if this member is hosting a real
* bucket.
*
* @param member the member who is not primary
*/
private void removePrimary(InternalDistributedMember member) {
boolean needToVolunteerForPrimary = false;
if (!isClosed()) { // hole: requestPrimaryState not hosting
initializationGate();
}
boolean lostPrimary = false;
try {
synchronized (this) {
boolean wasPrimary = isPrimary() && getDistributionManager().getId().equals(member);
final InternalDistributedMember currentPrimary = primaryMember.get();
if (currentPrimary != null && currentPrimary.equals(member)) {
if (logger.isDebugEnabled()) {
logger.debug("[BucketAdvisor.notPrimary] {} for {}", member, this);
}
primaryMember.set(null);
} else {
return;
}
if (isClosed()) {
// possibly closed if caller comes from outside this advisor
return; // return quietly
}
// member is primary... need to change state to NO_PRIMARY_xxx
if (isHosting()) {
requestPrimaryState(NO_PRIMARY_HOSTING);
if (pRegion.isFixedPartitionedRegion()) {
InternalDistributedMember primaryMember =
regionAdvisor.adviseFixedPrimaryPartitionDataStore(getBucket().getId());
needToVolunteerForPrimary = primaryMember == null || primaryMember.equals(member);
} else {
needToVolunteerForPrimary = true;
}
} else {
requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
}
if (wasPrimary) {
lostPrimary = true;
}
// try to fix yet another cause of bug 36881
findAndSetPrimaryMember();
}
} finally {
if (lostPrimary) {
invokeAfterSecondaryInPartitionListeners();
Bucket br = regionAdvisor.getBucket(getBucket().getId());
if (br instanceof BucketRegion) {
((BucketRegion) br).beforeReleasingPrimaryLockDuringDemotion();
}
releasePrimaryLock();
// this was a deposePrimary call so we need to depose children as well
deposePrimaryForColocatedChildren();
if (pRegion.isFixedPartitionedRegion()) {
deposeOtherPrimaryBucketForFixedPartition();
}
}
}
if (needToVolunteerForPrimary) {
volunteerForPrimary();
}
}
/**
* Returns the ProxyBucketRegion which owns this advisor.
*
* @return the ProxyBucketRegion which owns this advisor
*/
public ProxyBucketRegion getProxyBucketRegion() {
return (ProxyBucketRegion) getAdvisee();
}
/**
* Actually close this advisor for real. Called by ProxyBucketRegion only. Calling this method
* actually closes this advisor whereas {@link #close()} only sets hosting to false.
*/
void closeAdvisor() {
boolean wasPrimary;
synchronized (this) {
if (isClosed()) {
return;
}
wasPrimary = isPrimary();
super.close();
requestPrimaryState(CLOSED);
redundancyTracker.closeBucket();
localProfile = null;
}
if (wasPrimary) {
releasePrimaryLock();
}
}
/**
* Returns true if this advisor has been closed.
*
* @return true if this advisor has been closed
*/
protected boolean isClosed() {
return primaryState == CLOSED;
}
/**
* Returns true if this member is currently marked as primary.
*
* @return true if this member is currently marked as primary
*/
public boolean isPrimary() {
return primaryState == IS_PRIMARY_HOSTING;
}
/**
* Returns true if this member is currently volunteering for primary.
*
* @return true if this member is currently volunteering for primary
*/
private boolean isVolunteering() {
return primaryState == VOLUNTEERING_HOSTING;
}
/**
* Returns true if this member is currently attempting to become primary.
*
* @return true if this member is currently attempting to become primary
*/
private boolean isBecomingPrimary() {
synchronized (this) {
return primaryState == BECOMING_HOSTING && volunteeringDelegate != null
&& volunteeringDelegate.isAggressive();
}
}
/**
* Returns true if this member is currently hosting real bucket.
*
* @return true if this member is currently hosting real bucket
*/
public boolean isHosting() {
final byte primaryState = this.primaryState;
return primaryState == NO_PRIMARY_HOSTING || primaryState == OTHER_PRIMARY_HOSTING
|| primaryState == VOLUNTEERING_HOSTING || primaryState == BECOMING_HOSTING
|| primaryState == IS_PRIMARY_HOSTING;
}
/**
* Attempt to acquire lock for primary until a primary exists. Caller hands off responsibility to
* an executor (waiting pool) and returns early.
*/
public void volunteerForPrimary() {
InternalDistributedMember elector = primaryElector;
if (elector != null && regionAdvisor.hasPartitionedRegion(elector)) {
// another server will determine the primary node
return;
}
primaryElector = null;
initializationGate();
synchronized (this) {
if (isVolunteering() || isClosed() || !isHosting()) {
// only one thread should be attempting to volunteer at one time
return;
}
if (volunteeringDelegate == null) {
setVolunteeringDelegate(new VolunteeringDelegate());
}
volunteeringDelegate.volunteerForPrimary();
}
}
protected void setVolunteeringDelegate(VolunteeringDelegate delegate) {
volunteeringDelegate = delegate;
}
/**
* Makes this <code>BucketAdvisor</code> become the primary if it is already a secondary.
*
* @param isRebalance true if directed to become primary by rebalancing
* @return true if this advisor succeeds in becoming the primary
*/
public boolean becomePrimary(boolean isRebalance) {
initializationGate();
long startTime = getPartitionedRegionStats().startPrimaryTransfer(isRebalance);
try {
long waitTime = 2000; // time each iteration will wait
while (!isPrimary()) {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
boolean attemptToBecomePrimary = false;
boolean attemptToDeposePrimary = false;
if (Thread.currentThread().isInterrupted()) {
if (logger.isDebugEnabled()) {
logger.debug("Breaking from becomePrimary loop due to thread interrupt flag being set");
}
break;
}
if (isClosed() || !isHosting()) {
if (logger.isDebugEnabled()) {
logger.debug("Breaking from becomePrimary loop because {} is closed or not hosting",
this);
}
break;
}
VolunteeringDelegate vDelegate = null;
synchronized (this) {
if (isVolunteering()) {
// standard volunteering attempt already in progress...
if (logger.isDebugEnabled()) {
logger.debug("Waiting for volunteering thread {}. Time left: {} ms", this, waitTime);
}
wait(waitTime); // spurious wakeup ok
continue;
} else if (isBecomingPrimary()) {
// reattempt to depose otherPrimary...
attemptToDeposePrimary = true;
} else {
// invoke becomePrimary AFTER sync is released in this thread...
vDelegate = volunteeringDelegate;
if (vDelegate == null) {
vDelegate = new VolunteeringDelegate();
volunteeringDelegate = vDelegate;
}
} // else
} // synchronized
if (vDelegate != null) {
// Use the snapshot 'vDelegate' instead of 'this.volunteeringDelegate' since we are not
// synced here.
attemptToBecomePrimary = vDelegate.reserveForBecomePrimary(); // no sync!
}
// release synchronization and then call becomePrimary
if (attemptToBecomePrimary) {
synchronized (this) {
if (volunteeringDelegate == null) {
volunteeringDelegate = new VolunteeringDelegate();
}
volunteeringDelegate.volunteerForPrimary();
attemptToDeposePrimary = true;
} // synchronized
Thread.sleep(10);
} // attemptToBecomePrimary
// RACE: slight race condition with thread that's actually requesting the lock
if (attemptToDeposePrimary) {
InternalDistributedMember otherPrimary = getPrimary();
if (otherPrimary != null && !getDistributionManager().getId().equals(otherPrimary)) {
if (logger.isDebugEnabled()) {
logger.debug("Attempting to depose primary on {} for {}", otherPrimary, this);
}
DeposePrimaryBucketResponse response =
DeposePrimaryBucketMessage.send(otherPrimary, pRegion, getBucket().getId());
if (response != null) {
response.waitForRepliesUninterruptibly();
if (logger.isDebugEnabled()) {
logger.debug("Deposed primary on {}", otherPrimary);
}
}
}
Thread.sleep(10);
} // attemptToDeposePrimary
} // while
} catch (InterruptedException e) {
// abort and return null
Thread.currentThread().interrupt();
} finally {
getPartitionedRegionStats().endPrimaryTransfer(startTime, isPrimary(), isRebalance);
}
return isPrimary();
}
/**
* Check the primary member shortcut. Does not query the advisor. Should only be used when the
* advisor should not be consulted directly.
*
* @return the member or null if no primary exists
*/
public InternalDistributedMember basicGetPrimaryMember() {
return primaryMember.get();
}
/**
* Invoked when the primary lock has been acquired by this VM.
*
* @return true if successfully changed state to IS_PRIMARY
*/
private boolean acquiredPrimaryLock() {
if (logger.isDebugEnabled()) {
logger.debug("Acquired primary lock for BucketID {} PR : {}", getBucket().getId(),
regionAdvisor.getPartitionedRegion().getFullPath());
}
boolean changedStateToIsPrimary = false;
// Hold the primary move lock until we send a
// profile update. This will prevent writes
// from occurring until all members know that
// this member is now the primary.
boolean shouldInvokeListeners = false;
primaryMoveWriteLock.lock();
try {
synchronized (this) {
if (isHosting() && (isVolunteering() || isBecomingPrimary())) {
Bucket br = regionAdvisor.getBucket(getBucket().getId());
if (br instanceof BucketRegion) {
((BucketRegion) br).beforeAcquiringPrimaryState();
}
if (requestPrimaryState(IS_PRIMARY_HOSTING)) {
if (logger.isDebugEnabled()) {
logger.debug("Acquired primary lock for setting primary now BucketID {} PR : {}",
getBucket().getId(), regionAdvisor.getPartitionedRegion().getFullPath());
}
setPrimaryMember(getDistributionManager().getId());
changedStateToIsPrimary = true;
if (hasPrimary() && isPrimary()) {
shouldInvokeListeners = true;
}
}
}
}
if (shouldInvokeListeners) {
invokePartitionListeners();
}
return changedStateToIsPrimary;
} finally {
try {
if (changedStateToIsPrimary) {
// send profile update AFTER releasing sync
sendProfileUpdate();
Bucket br = regionAdvisor.getBucket(getBucket().getId());
if (br instanceof BucketRegion) {
((BucketRegion) br).processPendingSecondaryExpires();
}
if (br instanceof BucketRegionQueue) { // Shouldn't it be AbstractBucketRegionQueue
BucketRegionQueue brq = (BucketRegionQueue) br;
brq.incQueueSize(brq.size());
brq.decSecondaryQueueSize(brq.size());
}
if (br instanceof BucketRegion) {
((BucketRegion) br).afterAcquiringPrimaryState();
}
} else {
// release primary lock AFTER releasing sync
releasePrimaryLock();
}
} finally {
primaryMoveWriteLock.unlock();
}
}
}
private void invokePartitionListeners() {
PartitionListener[] listeners = pRegion.getPartitionListeners();
if (listeners == null || listeners.length == 0) {
return;
}
for (PartitionListener listener : listeners) {
if (listener != null) {
listener.afterPrimary(getBucket().getId());
}
}
}
private void invokeAfterSecondaryInPartitionListeners() {
PartitionListener[] listeners = pRegion.getPartitionListeners();
if (listeners == null || listeners.length == 0) {
return;
}
for (PartitionListener listener : listeners) {
if (listener != null) {
listener.afterSecondary(getBucket().getId());
}
}
}
/**
* Lazily gets the lock for acquiring primary lock. Caller must handle null. If DLS, Cache, or
* DistributedSystem are shutting down then null will be returned. If DLS does not yet exist and
* createDLS is false then null will be returned.
*
* @param createDLS true will create DLS if it does not exist
* @return distributed lock indicating primary member or null
*/
private DistributedMemberLock getPrimaryLock(boolean createDLS) {
synchronized (this) {
if (primaryLock == null) {
DistributedLockService dls = DistributedLockService
.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
if (dls == null) {
if (!createDLS || getProxyBucketRegion().getCache().isClosed()) {
return null; // cache closure has destroyed the DLS
}
try { // TODO: call GemFireCache#getPartitionedRegionLockService
dls = DLockService.create(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME,
getAdvisee().getSystem(), true /* distributed */, true /* destroyOnDisconnect */,
true /* automateFreeResources */);
} catch (IllegalArgumentException e) {
// indicates that the DLS is already created
dls = DistributedLockService
.getServiceNamed(PartitionedRegionHelper.PARTITION_LOCK_SERVICE_NAME);
if (dls == null) {
// another thread destroyed DLS after this thread called create
return null; // ok, caller will loop if necessary
}
}
// TODO: we need a good NotConnectedException to replace
// IllegalStateException and ShutdownException
// perhaps: DistributedSystemUnavailableException
catch (IllegalStateException | DistributedSystemDisconnectedException e) {
// create still throws IllegalStateException if isDisconnecting is true
return null;
} // this would certainly prevent us from creating a DLS... messy
}
primaryLock = new DistributedMemberLock(dls, getAdvisee().getName(),
DistributedMemberLock.NON_EXPIRING_LEASE,
DistributedMemberLock.LockReentryPolicy.PREVENT_SILENTLY);
}
return primaryLock;
}
}
private void acquirePrimaryRecursivelyForColocated() {
final List<PartitionedRegion> colocatedWithList =
ColocationHelper.getColocatedChildRegions(regionAdvisor.getPartitionedRegion());
if (colocatedWithList != null) {
for (PartitionedRegion childPR : colocatedWithList) {
Bucket b = childPR.getRegionAdvisor().getBucket(getBucket().getId());
BucketAdvisor childBA = b.getBucketAdvisor();
Assert.assertHoldsLock(childBA, false);
boolean acquireForChild = false;
if (logger.isDebugEnabled()) {
logger.debug(
"BucketAdvisor.acquirePrimaryRecursivelyForColocated: about to take lock for bucket: {} of PR: {} with isHosting={}",
getBucket().getId(), childPR.getFullPath(), childBA.isHosting());
}
childBA.primaryMoveWriteLock.lock();
try {
if (childBA.isHosting()) {
if (isPrimary()) {
if (!childBA.isPrimary()) {
childBA.setVolunteering();
boolean acquired = childBA.acquiredPrimaryLock();
acquireForChild = true;
if (acquired && pRegion.isFixedPartitionedRegion()) {
childBA.acquirePrimaryForRestOfTheBucket();
}
} else {
acquireForChild = true;
}
}
} // if isHosting
if (acquireForChild) {
childBA.acquirePrimaryRecursivelyForColocated();
}
} finally {
childBA.primaryMoveWriteLock.unlock();
}
}
}
}
private void acquirePrimaryForRestOfTheBucket() {
List<FixedPartitionAttributesImpl> fpas = pRegion.getFixedPartitionAttributesImpl();
if (fpas != null) {
int bucketId = getBucket().getId();
for (FixedPartitionAttributesImpl fpa : fpas) {
if (fpa.getStartingBucketID() == bucketId) {
for (int i = bucketId + 1; i <= fpa.getLastBucketID();) {
Bucket b = regionAdvisor.getBucket(i++);
if (b != null) {
BucketAdvisor ba = b.getBucketAdvisor();
ba.primaryMoveWriteLock.lock();
try {
if (ba.isHosting()) {
if (!ba.isPrimary()) {
ba.setVolunteering();
ba.acquiredPrimaryLock();
}
}
} finally {
ba.primaryMoveWriteLock.unlock();
}
}
}
}
}
}
}
/**
* Sets volunteering to true. Returns true if the state of volunteering was changed. Returns false
* if voluntering was already equal to true. Caller should do nothing if false is returned.
*/
private boolean setVolunteering() {
synchronized (this) {
return requestPrimaryState(VOLUNTEERING_HOSTING);
}
}
/**
* Sets becoming primary to true. Returns true if the state of becoming was changed. Returns false
* if becoming was already equal to true. Caller should do nothing if false is returned.
*/
private boolean setBecoming() {
synchronized (this) {
return requestPrimaryState(BECOMING_HOSTING);
}
}
/**
* Wait briefly for a primary member to be identified.
*
* @param timeout time in milliseconds to wait for a primary
* @return the primary bucket host
*/
private InternalDistributedMember waitForPrimaryMember(long timeout) {
synchronized (this) {
// let's park this thread and wait for a primary!
StopWatch timer = new StopWatch(true);
long warnTime = getDistributionManager().getConfig().getAckWaitThreshold() * 1000L;
boolean loggedWarning = false;
try {
for (;;) {
// bail out if the system starts closing
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
final InternalCache cache = getBucket().getCache();
if (cache != null && cache.isCacheAtShutdownAll()) {
throw cache.getCacheClosedException("Cache is shutting down");
}
if (getBucketRedundancy() == -1) {
// there are no real buckets in other vms... no reason to wait
return null;
}
getProxyBucketRegion().getPartitionedRegion().checkReadiness();
if (isClosed()) {
break;
}
long elapsed = timer.elapsedTimeMillis();
long timeLeft = timeout - elapsed;
if (timeLeft <= 0) {
break;
}
if (getBucketRedundancy() == -1 || isClosed()) {
break; // early out... all bucket regions are gone or we closed
}
InternalDistributedMember primary = basicGetPrimaryMember();
if (primary != null) {
return primary;
}
if (logger.isDebugEnabled()) {
logger.debug("Waiting for bucket {}. Time left :{} ms", this, timeLeft);
}
// Log a warning if we have waited for the ack wait threshold time.
if (!loggedWarning) {
long timeUntilWarning = warnTime - elapsed;
if (timeUntilWarning <= 0) {
logger
.warn(
"{} secs have elapsed waiting for a primary for bucket {}. Current bucket owners {}",
new Object[] {warnTime / 1000L, this, adviseInitialized()});
// log a warning;
loggedWarning = true;
} else {
timeLeft = timeLeft > timeUntilWarning ? timeUntilWarning : timeLeft;
}
}
wait(timeLeft); // spurious wakeup ok
}
} catch (InterruptedException e) {
// abort and return null
Thread.currentThread().interrupt();
} finally {
if (loggedWarning) {
logger.info(
"Wait for primary completed");
}
}
return null;
}
}
/**
* How long to wait, in millisecs, for redundant buckets to exist
*/
private static final long BUCKET_REDUNDANCY_WAIT = 15000L; // 15 seconds
/**
* Wait the desired redundancy to be met.
*
* @param minRedundancy the amount of desired redundancy.
* @return true if desired redundancy is detected
*/
public boolean waitForRedundancy(int minRedundancy) {
synchronized (this) {
// let's park this thread and wait for redundancy!
StopWatch timer = new StopWatch(true);
try {
for (;;) {
if (getBucketRedundancy() >= minRedundancy) {
return true;
}
getProxyBucketRegion().getPartitionedRegion().checkReadiness();
if (isClosed()) {
return false;
}
long timeLeft = BUCKET_REDUNDANCY_WAIT - timer.elapsedTimeMillis();
if (timeLeft <= 0) {
return false;
}
if (logger.isDebugEnabled()) {
logger.debug("Waiting for bucket {}", this);
}
wait(timeLeft); // spurious wakeup ok
}
} catch (InterruptedException e) {
// abort and return null
Thread.currentThread().interrupt();
}
return false;
}
}
synchronized void clearPrimaryElector() {
primaryElector = null;
}
synchronized void setPrimaryElector(InternalDistributedMember newPrimaryElector) {
// Only set the new primary elector if we have not yet seen
// a primary for this bucket.
if (primaryElector != null) {
if (newPrimaryElector != null && !regionAdvisor.hasPartitionedRegion(newPrimaryElector)) {
// no longer a participant - don't use it
primaryElector = null;
} else {
primaryElector = newPrimaryElector;
}
}
}
public synchronized void initializePrimaryElector(InternalDistributedMember newPrimaryElector) {
// For child buckets, we want the parent bucket to take care'
// of finishing an incomplete bucket creation, so only set the elector for
// the leader region.
if (parentAdvisor == null) {
if (newPrimaryElector != null && !regionAdvisor.hasPartitionedRegion(newPrimaryElector)) {
// no longer a participant - don't use it
primaryElector = null;
} else {
primaryElector = newPrimaryElector;
}
}
}
/**
* Invoked when real bucket is created for hosting in this VM.
*
* @param value true to begin hosting; false to end hosting
*/
protected void setHosting(boolean value) {
// boolean needToNotPrimarySelf = false;
boolean needToVolunteerForPrimary = false;
boolean wasPrimary;
synchronized (this) {
wasPrimary = isPrimary();
if (isClosed()) {
return;
}
if (value) { // setting to HOSTING...
if (hasPrimary()) {
requestPrimaryState(OTHER_PRIMARY_HOSTING);
} else {
requestPrimaryState(NO_PRIMARY_HOSTING);
needToVolunteerForPrimary = true;
}
}
else { // setting to NOT_HOSTING...
if (hasPrimary()) { // has primary...
if (isPrimary()) {
requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
primaryMember.set(null);
findAndSetPrimaryMember();
} else {
requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING);
}
} else { // no primary...
// acquiredPrimaryLock will check isHosting and release if not hosting
requestPrimaryState(NO_PRIMARY_NOT_HOSTING);
}
}
volunteeringDelegate = null;
// Note - checkRedundancy has the side effect that it updates the stats.
// We need to invoke checkRedundancy here, regardless of whether we
// need this notify.
if (updateRedundancy() > 0 && isHosting()) {
// wake up any threads in waitForRedundancy or waitForPrimary
notifyAll();
}
}
if (wasPrimary) {
releasePrimaryLock();
}
if (logger.isTraceEnabled()) {
logger.trace("setHosting: {} needToVolunteerForPrimary={} primaryElector: {}", this,
needToVolunteerForPrimary, primaryElector);
}
/*
* if (needToNotPrimarySelf) { notPrimary(getAdvisee().getDistributionManager().getId()); }
*/
if (needToVolunteerForPrimary) {
volunteerForPrimary();
}
sendProfileUpdate();
}
/**
* Sends updated profile for this member to every member with the <code>PartitionedRegion</code>.
* <p>
* Never call this method while synchronized on this BucketAdvisor. This will result in
* distributed deadlocks.
*/
private void sendProfileUpdate() {
if (getDistributionManager().getSystem().isLoner()) {
// no one to send the profile update... return to prevent bug 39760
return;
}
// make sure caller is not synchronized or we'll deadlock
Assert.assertTrue(!Thread.holdsLock(this),
"Attempting to sendProfileUpdate while synchronized may result in deadlock");
// NOTE: if this assert fails, you COULD use the WaitingThreadPool in DM
final int partitionedRegionId = pRegion.getPRId();
final int bucketId = ((ProxyBucketRegion) getAdvisee()).getBucketId();
BucketProfile bp = (BucketProfile) createProfile();
updateServerBucketProfile(bp);
InternalDistributedMember primary = basicGetPrimaryMember();
HashSet<InternalDistributedMember> hostsAndProxyMembers = new HashSet<>();
if (primary != null && !primary.equals(getDistributionManager().getId())) {
hostsAndProxyMembers.add(primary); // Add the primary
}
hostsAndProxyMembers.addAll(adviseGeneric()); // Add all members hosting the bucket
hostsAndProxyMembers.addAll(adviseProfileUpdate()); // Add all proxy instances that could use
// the bucket
ReplyProcessor21 reply = BucketProfileUpdateMessage.send(hostsAndProxyMembers,
getDistributionManager(), partitionedRegionId, bucketId, bp, true);
if (reply != null) {
reply.waitForRepliesUninterruptibly();
}
}
/**
* Returns true if the a primary is known.
*/
private boolean hasPrimary() {
final byte primaryState = this.primaryState;
return primaryState == OTHER_PRIMARY_NOT_HOSTING || primaryState == OTHER_PRIMARY_HOSTING
|| primaryState == IS_PRIMARY_HOSTING;
}
@Override
protected Profile instantiateProfile(InternalDistributedMember memberId, int version) {
if (!pRegion.isShadowPR()) {
Set<BucketServerLocation66> serverLocations = getBucketServerLocations(version);
if (!serverLocations.isEmpty()) {
return new ServerBucketProfile(memberId, version, getBucket(), (HashSet) serverLocations);
}
}
return new BucketProfile(memberId, version, getBucket());
}
protected Set<BucketServerLocation66> getBucketServerLocations(int version) {
InternalCache cache = getProxyBucketRegion().getCache();
List<CacheServer> servers = cache.getCacheServers();
if (servers.isEmpty()) {
return Collections.emptySet();
}
HashSet<BucketServerLocation66> serverLocations = new HashSet<>();
for (CacheServer cacheServer : servers) {
CacheServerImpl server = (CacheServerImpl) cacheServer;
try {
String serverExternalAddress;
if (server.isRunning() && ((serverExternalAddress = server.getExternalAddress()) != null)) {
BucketServerLocation66 location = new BucketServerLocation66(getBucket().getId(),
server.getPort(), serverExternalAddress, getBucket().isPrimary(),
Integer.valueOf(version).byteValue(), server.getCombinedGroups());
serverLocations.add(location);
}
} catch (IllegalStateException e) {
if (!(e.getMessage() != null
&& e.getMessage().equals(CACHE_SERVER_BIND_ADDRESS_NOT_AVAILABLE_EXCEPTION_MESSAGE))) {
throw e;
}
}
}
return serverLocations;
}
/**
* Sets primaryMember and notifies all. Caller must be synced on this.
*
* @param id the member to use as primary for this bucket
*/
private void setPrimaryMember(InternalDistributedMember id) {
if (!getDistributionManager().getId().equals(id)) {
// volunteerForPrimary handles primary state change if its our id
if (isHosting()) {
requestPrimaryState(OTHER_PRIMARY_HOSTING);
} else {
requestPrimaryState(OTHER_PRIMARY_NOT_HOSTING);
}
}
primaryMember.set(id);
everHadPrimary = true;
if (id != null && id.equals(primaryElector)) {
primaryElector = null;
}
notifyAll(); // wake up any threads in waitForPrimaryMember
}
void setHadPrimary() {
everHadPrimary = true;
}
boolean getHadPrimary() {
return everHadPrimary;
}
public InternalDistributedMember getPrimaryElector() {
return primaryElector;
}
/**
* Get the current number of bucket hosts and update the redundancy statistics for the region
*
* @return number of current bucket hosts
*/
private int updateRedundancy() {
int numBucketHosts = getNumInitializedBuckets();
if (!isClosed()) {
redundancyTracker.updateStatistics(numBucketHosts);
}
return numBucketHosts;
}
/**
* Returns all {@link InternalDistributedMember}s currently flagged as primary.
* <p>
* Since profile messages may arrive out of order from different members, more than one member may
* temporarily be flagged as primary.
* <p>
* The user of this BucketAdvisor should simply assume that the first profile is primary until the
* dust settles, leaving only one primary profile.
*
* @return zero or greater array of primary members
*/
private InternalDistributedMember[] findPrimaryMembers() {
Set<InternalDistributedMember> primaryMembers = adviseFilter(profile -> {
assert profile instanceof BucketProfile;
BucketProfile srp = (BucketProfile) profile;
return srp.isPrimary;
});
if (primaryMembers.size() > 1 && logger.isDebugEnabled()) {
logger.debug("[findPrimaryProfiles] found the following primary members for {}: {}",
getAdvisee().getName(), primaryMembers);
}
return primaryMembers.toArray(new InternalDistributedMember[0]);
}
/**
* Searches through profiles to find first profile that is flagged as primary and sets
* {@link #primaryMember} to it. Caller must synchronize on this BucketAdvisor.
*
* @return true if a primary member was found and used
* @see #findAndSetPrimaryMember()
*/
private boolean findAndSetPrimaryMember() {
if (isPrimary()) {
setPrimaryMember(getDistributionManager().getDistributionManagerId());
return true;
}
InternalDistributedMember[] primaryMembers = findPrimaryMembers();
if (primaryMembers.length > 0) {
setPrimaryMember(primaryMembers[0]);
return true;
} else {
return false;
}
}
/**
* Returns the current redundancy of the this bucket, including the locally hosted bucket if it
* exists.
*
* @return current number of hosts of this bucket ; -1 if there are no hosts
*/
public int getBucketRedundancy() {
return redundancyTracker.getCurrentRedundancy();
}
Set<InternalDistributedMember> adviseInitialized() {
return adviseFilter(profile -> {
assert profile instanceof BucketProfile;
BucketProfile bucketProfile = (BucketProfile) profile;
return bucketProfile.isHosting;
});
}
Set<InternalDistributedMember> adviseRecoveredFromDisk() {
return regionAdvisor.adviseInitializedDataStore();
}
/**
* Get the number of members that are hosting the bucket, and have finished initialization.
*
* This method is currently only used to check the bucket redundancy just before creating the
* bucket. If it is used more frequently, it might be better to cache this count.
*/
private int getNumInitializedBuckets() {
Profile[] locProfiles = profiles; // grab current profiles
int count = 0;
for (Profile profile : locProfiles) {
BucketProfile bucketProfile = (BucketProfile) profile;
if (bucketProfile.isHosting) {
count++;
}
}
if (isHosting()) {
count++;
}
return count;
}
private Bucket getBucket() {
return (Bucket) getAdvisee();
}
/**
* Releases the primary lock for this bucket.
*/
private void releasePrimaryLock() {
// We don't have a lock if we have a parent advisor
if (parentAdvisor != null) {
return;
}
assignStartingBucketAdvisorIfFixedPartitioned();
if (startingBucketAdvisor != null) {
return;
}
try {
DistributedMemberLock thePrimaryLock = getPrimaryLock(false);
if (thePrimaryLock != null) {
thePrimaryLock.unlock();
}
} catch (LockNotHeldException e) {
Assert.assertTrue(!isHosting(), "Got LockNotHeldException for Bucket = " + this);
} catch (LockServiceDestroyedException e) {
Assert.assertTrue(isClosed(),
"BucketAdvisor was not closed before destroying PR lock service");
}
}
private String primaryStateToString() {
return primaryStateToString(primaryState);
}
/**
* Returns string representation of the primary state value.
*
* @param value the primary state to return string for
* @return string representation of primaryState
*/
private String primaryStateToString(byte value) {
switch (value) {
case NO_PRIMARY_NOT_HOSTING:
return "NO_PRIMARY_NOT_HOSTING";
case NO_PRIMARY_HOSTING:
return "NO_PRIMARY_HOSTING";
case OTHER_PRIMARY_NOT_HOSTING:
return "OTHER_PRIMARY_NOT_HOSTING";
case OTHER_PRIMARY_HOSTING:
return "OTHER_PRIMARY_HOSTING";
case VOLUNTEERING_HOSTING:
return "VOLUNTEERING_HOSTING";
case BECOMING_HOSTING:
return "BECOMING_HOSTING";
case IS_PRIMARY_HOSTING:
return "IS_PRIMARY_HOSTING";
case CLOSED:
return "CLOSED";
default:
return "<unhandled primaryState " + value + " >";
}
}
/**
* Requests change to the requested primary state. Controls all state changes pertaining to
* primary state. Caller must be synchronized on this.
*
* @param requestedState primaryState to change to
* @return true if the requestedState change was completed
* @throws IllegalStateException if an illegal state change was attempted
*/
private boolean requestPrimaryState(byte requestedState) {
final byte fromState = primaryState;
switch (fromState) {
case NO_PRIMARY_NOT_HOSTING:
switch (requestedState) {
case NO_PRIMARY_NOT_HOSTING:
// race condition ok, return false
return false;
case NO_PRIMARY_HOSTING:
primaryState = requestedState;
break;
case OTHER_PRIMARY_NOT_HOSTING:
primaryState = requestedState;
break;
case OTHER_PRIMARY_HOSTING:
primaryState = requestedState;
break;
case BECOMING_HOSTING:
// race condition during close is ok, return false
return false;
case VOLUNTEERING_HOSTING:
// race condition during close is ok, return false
return false;
case CLOSED:
primaryState = requestedState;
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(), primaryStateToString(requestedState)));
}
break;
case NO_PRIMARY_HOSTING:
switch (requestedState) {
case NO_PRIMARY_NOT_HOSTING:
primaryState = requestedState;
break;
// case OTHER_PRIMARY_NOT_HOSTING: -- enable for bucket migration
// this.primaryState = requestedState;
// break;
case NO_PRIMARY_HOSTING:
// race condition ok, return false
return false;
case VOLUNTEERING_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.putStartTime(this, stats.startVolunteering());
}
break;
case BECOMING_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.putStartTime(this, stats.startVolunteering());
}
break;
case OTHER_PRIMARY_HOSTING:
primaryState = requestedState;
break;
case CLOSED:
primaryState = requestedState;
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(),
primaryStateToString(requestedState)));
}
break;
case OTHER_PRIMARY_NOT_HOSTING:
switch (requestedState) {
case NO_PRIMARY_NOT_HOSTING:
primaryState = requestedState;
break;
case OTHER_PRIMARY_NOT_HOSTING:
// race condition ok, return false
return false;
case OTHER_PRIMARY_HOSTING:
primaryState = requestedState;
break;
case BECOMING_HOSTING:
// race condition during close is ok, return false
return false;
case VOLUNTEERING_HOSTING:
// race condition during close is ok, return false
return false;
case CLOSED:
primaryState = requestedState;
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(),
primaryStateToString(requestedState)));
}
break;
case OTHER_PRIMARY_HOSTING:
switch (requestedState) {
// case NO_PRIMARY_NOT_HOSTING: -- enable for bucket migration
// this.primaryState = requestedState;
// break;
case OTHER_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called
primaryState = requestedState;
break;
case OTHER_PRIMARY_HOSTING:
// race condition ok, return false
return false;
case NO_PRIMARY_HOSTING:
primaryState = requestedState;
break;
case CLOSED:
primaryState = requestedState;
break;
case VOLUNTEERING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case BECOMING_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.putStartTime(this, stats.startVolunteering());
}
break;
case IS_PRIMARY_HOSTING:
// race condition ok, probably race in HA where other becomes
// primary and immediately leaves while we have try-lock message
// enroute to grantor
primaryState = requestedState;
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(),
primaryStateToString(requestedState)));
}
break;
case VOLUNTEERING_HOSTING:
switch (requestedState) {
case NO_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
case OTHER_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called
// Profile update for other primary may have slipped in
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
case NO_PRIMARY_HOSTING:
// race condition occurred, return false and stay in volunteering
return false;
case IS_PRIMARY_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.incPrimaryBucketCount(1);
stats.endVolunteeringBecamePrimary(stats.removeStartTime(this));
}
break;
case OTHER_PRIMARY_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringOtherPrimary(stats.removeStartTime(this));
}
break;
case VOLUNTEERING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case BECOMING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case CLOSED:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(),
primaryStateToString(requestedState)));
}
break;
case BECOMING_HOSTING:
switch (requestedState) {
case NO_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
case OTHER_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called
// Profile update for other primary may have slipped in
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
case NO_PRIMARY_HOSTING:
// race condition occurred, return false and stay in volunteering
return false;
case IS_PRIMARY_HOSTING:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.incPrimaryBucketCount(1);
stats.endVolunteeringBecamePrimary(stats.removeStartTime(this));
}
break;
case OTHER_PRIMARY_HOSTING:
return false;
case VOLUNTEERING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case BECOMING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case CLOSED:
primaryState = requestedState; {
PartitionedRegionStats stats = getPartitionedRegionStats();
stats.endVolunteeringClosed(stats.removeStartTime(this));
}
break;
default:
throw new IllegalStateException(String.format("Cannot change from %s to %s",
primaryStateToString(),
primaryStateToString(requestedState)));
}
break;
case IS_PRIMARY_HOSTING:
switch (requestedState) {
case NO_PRIMARY_HOSTING:
// rebalancing must have moved the primary
changeFromPrimaryTo(requestedState);
break;
// case OTHER_PRIMARY_HOSTING: -- enable for bucket migration
// // rebalancing must have moved the primary
// changeFromPrimaryTo(requestedState);
// break;
case OTHER_PRIMARY_NOT_HOSTING:
// rebalancing must have moved the primary and primary
changeFromPrimaryTo(requestedState);
break;
case NO_PRIMARY_NOT_HOSTING:
// May occur when setHosting(false) is called due to closing
changeFromPrimaryTo(requestedState);
break;
case VOLUNTEERING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case BECOMING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case CLOSED:
changeFromPrimaryTo(requestedState);
break;
default:
throw new IllegalStateException("Cannot change from " + primaryStateToString()
+ " to " + primaryStateToString(requestedState));
}
break;
case CLOSED:
switch (requestedState) {
case CLOSED:
Exception e = new Exception(
"Attempted to close BucketAdvisor that is already CLOSED");
logger.warn("Attempted to close BucketAdvisor that is already CLOSED",
e);
break;
case VOLUNTEERING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case BECOMING_HOSTING:
// race condition ok, return false to abort volunteering
return false;
case IS_PRIMARY_HOSTING:
// Commonly occurs when closing and volunteering thread is still running
return false;
case OTHER_PRIMARY_NOT_HOSTING:
// Commonly occurs when a putProfile occurs during closure
return false;
default:
throw new IllegalStateException(
String.format("Cannot change from %s to %s for bucket %s",
primaryStateToString(),
primaryStateToString(requestedState), getAdvisee().getName()));
}
}
return primaryState == requestedState;
}
private void changeFromPrimaryTo(byte requestedState) {
try {
primaryState = requestedState;
} finally {
getPartitionedRegionStats().incPrimaryBucketCount(-1);
}
}
@Override
public Set adviseDestroyRegion() {
// fix for bug 37604 - tell all owners of the pr that the bucket is being
// destroyed. This is needed when bucket cleanup is performed
return regionAdvisor.adviseAllPRNodes();
}
/**
* returns the set of all the members in the system which require both DistributedCacheOperation
* messages and notification-only partition messages
*
* @return a set of recipients requiring both cache-op and notification messages
* @since GemFire 5.7
*/
public Set<InternalDistributedMember> adviseRequiresTwoMessages() {
return adviseNotInitialized();
}
private Set<InternalDistributedMember> adviseNotInitialized() {
return adviseFilter(profile -> {
assert profile instanceof CacheProfile;
CacheProfile cp = (CacheProfile) profile;
return !cp.regionInitialized;
});
}
@Override
public Set adviseNetWrite() {
return regionAdvisor.adviseNetWrite();
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder("[BucketAdvisor ").append(getAdvisee().getFullPath())
.append(':').append(getAdvisee().getSerialNumber()).append(": ");
sb.append("state=").append(primaryStateToString());
sb.append("]");
return sb.toString();
}
// A listener for events on this bucket, and also for the entire PR
@Override
public void addMembershipAndProxyListener(MembershipListener listener) {
super.addMembershipAndProxyListener(listener);
regionAdvisor.addMembershipListener(listener);
}
@Override
public void removeMembershipAndProxyListener(MembershipListener listener) {
regionAdvisor.removeMembershipListener(listener);
super.removeMembershipAndProxyListener(listener);
}
/**
* Called from endBucket creation. We send out a profile to notify others that the persistence is
* initialized.
*/
void endBucketCreation() {
sendProfileUpdate();
}
/**
* Profile information for a remote bucket counterpart.
*/
public static class BucketProfile extends CacheProfile {
/** True if this profile's member is attempting to initialize the bucket */
public boolean isInitializing;
/** True if this profile's member is the primary for this bucket */
public boolean isPrimary;
/**
* True if the profile is coming from a real BucketRegion acceptible states hosting = false,
* init = true hosting = true, init = false hosting = false, init = false unacceptible states
* hosting = true, init = true
*/
public boolean isHosting;
/**
* True if the bucket has been removed from this host. (
*/
public boolean removed;
public BucketProfile() {}
public BucketProfile(InternalDistributedMember memberId, int version, Bucket bucket) {
super(memberId, version);
isPrimary = bucket.isPrimary();
isHosting = bucket.isHosting();
}
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("BucketAdvisor.BucketProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
sb.append("; isPrimary=").append(isPrimary);
sb.append("; isHosting=").append(isHosting);
sb.append("; isInitializing=").append(isInitializing);
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
isPrimary = in.readBoolean();
isHosting = in.readBoolean();
isInitializing = in.readBoolean();
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
out.writeBoolean(isPrimary);
out.writeBoolean(isHosting);
out.writeBoolean(isInitializing);
}
@Override
public int getDSFID() {
return BUCKET_PROFILE;
}
}
/**
* Profile information for a remote bucket hosted by cache servers.
*/
public static class ServerBucketProfile extends BucketProfile {
public Set<BucketServerLocation66> bucketServerLocations;
private int bucketId;
public ServerBucketProfile() {}
public ServerBucketProfile(InternalDistributedMember memberId, int version, Bucket bucket,
HashSet<BucketServerLocation66> serverLocations) {
super(memberId, version, bucket);
bucketId = bucket.getId();
bucketServerLocations = serverLocations;
}
@Override
public StringBuilder getToStringHeader() {
return new StringBuilder("BucketAdvisor.ServerBucketProfile");
}
@Override
public void fillInToString(StringBuilder sb) {
super.fillInToString(sb);
for (BucketServerLocation66 location : bucketServerLocations) {
sb.append("; hostName=").append(location.getHostName());
sb.append("; port=").append(location.getPort());
}
}
@Override
public void fromData(DataInput in,
DeserializationContext context) throws IOException, ClassNotFoundException {
super.fromData(in, context);
bucketServerLocations = SerializationHelper.readBucketServerLocationSet(in);
bucketId = DataSerializer.readPrimitiveInt(in);
}
@Override
public void toData(DataOutput out,
SerializationContext context) throws IOException {
super.toData(out, context);
SerializationHelper.writeBucketServerLocationSet(bucketServerLocations, out);
DataSerializer.writePrimitiveInt(bucketId, out);
}
public Set<BucketServerLocation66> getBucketServerLocations() {
return bucketServerLocations;
}
@Override
public int getDSFID() {
return SERVER_BUCKET_PROFILE;
}
@Override
public int hashCode() {
BucketServerLocation66 sl = (BucketServerLocation66) bucketServerLocations.toArray()[0];
return 31 * bucketId + sl.getPort();
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (!(obj instanceof ServerBucketProfile))
return false;
final ServerBucketProfile other = (ServerBucketProfile) obj;
if (other.bucketId != bucketId) {
return false;
}
if (other.bucketServerLocations.size() != bucketServerLocations.size()) {
return false;
}
return other.bucketServerLocations.containsAll(bucketServerLocations);
}
}
/**
* Handles the actual volunteering to become primary bucket. Ensures that only one thread is ever
* volunteering at one time.
*
*/
class VolunteeringDelegate {
/**
* Reference to the Thread that is currently volunteering. Protected by
* synchronized(volunteeringLock).
*/
private Thread volunteeringThread;
private boolean aggressive = false;
/**
* Returns true if this delegate is aggressively trying to become the primary even if another
* member is already the primary.
*
* @return true if this aggressively trying to become the primary
*/
boolean isAggressive() {
synchronized (BucketAdvisor.this) {
return aggressive;
}
}
/**
* Initiates volunteering for primary. Repeated calls are harmless. Invoked by the
* BucketAdvisor. Caller must be synchronized on BucketAdvisor.
*/
void volunteerForPrimary() {
boolean handedOff = false;
while (!handedOff) {
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
execute(this::doVolunteerForPrimary);
handedOff = true;
} catch (InterruptedException e) {
interrupted = true;
getAdvisee().getCancelCriterion().checkCancelInProgress(e);
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
}
}
/**
* Reserves this delegate for the current thread to call becomePrimary. Necessary because caller
* of doVolunteerForPrimary must not be synchronized on BucketAdvisor.
*
* @return true if successfully reserved for becomePrimary
*/
boolean reserveForBecomePrimary() {
synchronized (BucketAdvisor.this) {
if (volunteeringThread != null) {
return false;
}
aggressive = true;
return true;
}
}
/**
* Invoked by the thread that performs the actual volunteering work.
*/
void doVolunteerForPrimary() {
if (!beginVolunteering()) {
return;
}
boolean dlsDestroyed = false;
try {
if (logger.isDebugEnabled()) {
logger.debug("Begin volunteerForPrimary for {}", BucketAdvisor.this);
}
DistributedMemberLock thePrimaryLock = null;
while (continueVolunteering()) {
// Fix for 41865 - We can't send out profiles while holding the
// sync on this advisor, because that will cause a deadlock.
// Holding the primaryMoveWriteLock here instead prevents any
// operations from being performed on this primary until the child regions
// are synced up. It also prevents a depose from happening until then.
BucketAdvisor parentBA = parentAdvisor;
primaryMoveWriteLock.lock();
try {
boolean acquiredLock;
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
// Check our parent advisor and set our state
// accordingly
if (parentBA != null) {
// Fix for 44350 - we don't want to get a primary move lock on
// the advisor, because that might deadlock with a user thread.
// However, since all depose/elect operations on the parent bucket
// cascade to the child bucket and get the child bucket move lock,
// if should be safe to check this without the lock here.
if (parentBA.isPrimary() && !isPrimary()) {
acquiredLock = acquiredPrimaryLock();
} else {
return;
}
} else {
// we're not colocated, need to get the dlock
assignStartingBucketAdvisorIfFixedPartitioned();
if (startingBucketAdvisor != null) {
Assert.assertHoldsLock(this, false);
synchronized (startingBucketAdvisor) {
if (startingBucketAdvisor.isPrimary() && !isPrimary()) {
acquiredLock = acquiredPrimaryLock();
} else {
return;
}
}
} else {
if (thePrimaryLock == null) {
thePrimaryLock = getPrimaryLock(true);
if (thePrimaryLock == null) {
// InternalDistributedSystem.isDisconnecting probably
// prevented us from
// creating the DLS... hope there's a thread closing this
// advisor but
// it's probably not safe to assert that it already happened
return;
}
}
Assert.assertTrue(!thePrimaryLock.holdsLock());
if (isAggressive()) {
acquiredLock = thePrimaryLock.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
} else {
acquiredLock = thePrimaryLock.tryLock();
}
if (acquiredLock) {
acquiredLock = acquiredPrimaryLock();
}
} // parentAdvisor == null
}
if (acquiredLock) {
// if the lock has been acquired then try to do the same for colocated PR's too
// Here if somehow a bucket can't acquire a lock
// we assume that it is in the process of becoming primary through
// BucketAdvisor.volunteerForPrimary()
// Either way we have to guarantee that the bucket becomes primary for sure.(How to
// guarantee?)
acquirePrimaryRecursivelyForColocated();
acquirePrimaryForRestOfTheBucket();
return;
}
} finally {
primaryMoveWriteLock.unlock();
}
// else: acquiredPrimaryLock released thePrimaryLock
if (!continueVolunteering()) {
// this avoids calling the wait below...
return;
}
waitIfNoPrimaryMemberFound();
} // while
} catch (LockServiceDestroyedException e) {
dlsDestroyed = true;
handleException(e, true);
} catch (RegionDestroyedException | CancelException e) {
handleException(e, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
handleException(e, false);
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Exit volunteerForPrimary for {}; dlsDestroyed={}", BucketAdvisor.this,
dlsDestroyed);
}
endVolunteering();
// if (isPrimary()) {
// Bucket bucket = getBucket();
// if (bucket instanceof ProxyBucketRegion) {
// bucket = ((ProxyBucketRegion)bucket).getHostedBucketRegion();
// }
// }
}
}
/**
* Called from catch blocks in {@link #doVolunteerForPrimary()}. Handles the exception properly
* based on advisor settings and shutdown condition.
*
* @param e the RuntimeException that was caught while volunteering
* @param loggit true if message should be logged if shutdown condition is not met
*/
private void handleException(Exception e, boolean loggit) {
boolean safe = isClosed() || getAdvisee().getCancelCriterion().isCancelInProgress();
if (!safe) {
if (ENFORCE_SAFE_CLOSE) {
Assert.assertTrue(safe,
"BucketAdvisor was not closed properly.");
} else if (loggit) {
logger.warn("BucketAdvisor was not closed properly.", e);
}
}
}
private boolean beginVolunteering() {
synchronized (BucketAdvisor.this) {
if (Thread.currentThread().equals(volunteeringThread)) {
return true; // this thread is already volunteering or reserved
}
if (volunteeringThread != null) {
// another thread is already volunteering
return false;
}
volunteeringThread = Thread.currentThread();
boolean changedState = false;
try {
if (isAggressive()) {
changedState = setBecoming();
} else {
changedState = setVolunteering();
}
return changedState;
} finally {
if (!changedState) {
aggressive = false;
volunteeringThread = null;
}
}
}
}
private boolean continueVolunteering() {
synchronized (BucketAdvisor.this) {
// false if caller is not the volunteeringThread
if (!Thread.currentThread().equals(volunteeringThread)) {
return false;
}
if (!isVolunteering() && !isBecomingPrimary()) {
return false;
}
// false if primaryMember is not null
if (!isAggressive() && basicGetPrimaryMember() != null) {
return false;
}
// false if this member is already primary
if (isPrimary()) {
return false;
}
// false if closed
if (isClosed()) {
return false;
}
// false if no longer hosting
return isHosting();
// must be true... need to continue volunteering
}
}
private void endVolunteering() {
if (Thread.currentThread().equals(volunteeringThread)) {
volunteeringThread = null;
aggressive = false;
}
}
private void waitIfNoPrimaryMemberFound() {
synchronized (BucketAdvisor.this) {
if (basicGetPrimaryMember() == null) {
waitForPrimaryMember(100);
if (basicGetPrimaryMember() == null) {
findAndSetPrimaryMember();
}
}
}
}
/**
* Executes the primary volunteering task after queuing it in the
* {@link BucketAdvisor#getVolunteeringQueue()}. A number of threads equal to
* {@link RegionAdvisor#VOLUNTEERING_THREAD_COUNT} are permitted to consume from the queue by
* acquiring permits from {@link BucketAdvisor#getVolunteeringSemaphore()}.
*
* @param volunteeringTask the task to queue and then execute in waiting thread pool
*
*/
private void execute(Runnable volunteeringTask) throws InterruptedException {
// @todo: instead of having a semaphore and queue on RegionAdvisor
// we should have an executor which limits its max threads to
// VOLUNTEERING_THREAD_COUNT.
if (Thread.interrupted())
throw new InterruptedException();
Queue<Runnable> volunteeringQueue = getVolunteeringQueue();
synchronized (volunteeringQueue) {
// add the volunteering task
volunteeringQueue.add(volunteeringTask);
if (getVolunteeringSemaphore().tryAcquire()) {
// ensure there is a thread consuming the queue
boolean handedOff = false;
try {
getDistributionManager().getExecutors().getWaitingThreadPool().execute(consumeQueue());
handedOff = true;
} finally {
if (!handedOff) {
getVolunteeringSemaphore().release();
}
}
}
}
}
/**
* Returns the runnable used to consume the volunteering queue. The executing thread(s) will
* consume from the queue until it is empty.
*
* @return runnable for consuming the volunteering queue
*/
private Runnable consumeQueue() {
return () -> {
getPartitionedRegionStats().incVolunteeringThreads(1);
boolean releaseSemaphore = true;
try {
Queue<Runnable> volunteeringQueue = getVolunteeringQueue();
Runnable queuedWork;
while (true) {
// SystemFailure.checkFailure();
getAdvisee().getCancelCriterion().checkCancelInProgress(null);
synchronized (volunteeringQueue) {
// synchronized volunteeringQueue for coordination between threads adding
// work to the queue and checking for a consuming thread and the existing
// consuming thread to determine if it can exit since the queue is empty.
queuedWork = volunteeringQueue.poll();
if (queuedWork == null) {
// the queue is empty... no more work... so return
// @todo why release the semaphore here are sync'ed?
// we could just let the finally block do it.
getVolunteeringSemaphore().release();
releaseSemaphore = false;
return;
}
// still more work in the queue so let's run it
}
try {
queuedWork.run();
} catch (CancelException e) {
return;
} catch (RuntimeException e) {
// log and continue consuming queue
logger.error(e.getMessage(), e);
}
}
} finally {
getPartitionedRegionStats().incVolunteeringThreads(-1);
if (releaseSemaphore) {
// Clean up, just in case
getVolunteeringSemaphore().release();
}
}
};
}
}
void setShadowBucketDestroyed(boolean destroyed) {
shadowBucketDestroyed = destroyed;
}
public boolean getShadowBucketDestroyed() {
return shadowBucketDestroyed;
}
}