blob: dc8351a5432b9ff25f0c0ed745bfd46217fa1740 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
package org.apache.geode.internal.cache;
import static java.lang.String.format;
import static java.lang.System.lineSeparator;
import static java.util.Collections.emptyList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.geode.internal.cache.ColocationHelper.checkMembersColocation;
import static org.apache.geode.internal.cache.PartitionedRegionHelper.printCollection;
import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiFunction;
import org.apache.logging.log4j.Logger;
import org.apache.geode.CancelCriterion;
import org.apache.geode.CancelException;
import org.apache.geode.SystemFailure;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MakeNotStatic;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.PartitionedRegionStorageException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.persistence.PartitionOfflineException;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.LonerDistributionManager;
import org.apache.geode.distributed.internal.MembershipListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.NanoTimer;
import org.apache.geode.internal.OneTaskOnlyExecutor;
import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper;
import org.apache.geode.internal.cache.PartitionedRegionDataStore.CreateBucketResult;
import org.apache.geode.internal.cache.control.InternalResourceManager;
import org.apache.geode.internal.cache.partitioned.Bucket;
import org.apache.geode.internal.cache.partitioned.BucketBackupMessage;
import org.apache.geode.internal.cache.partitioned.CreateBucketMessage;
import org.apache.geode.internal.cache.partitioned.CreateMissingBucketsTask;
import org.apache.geode.internal.cache.partitioned.DataStoreBuckets;
import org.apache.geode.internal.cache.partitioned.EndBucketCreationMessage;
import org.apache.geode.internal.cache.partitioned.FetchPartitionDetailsMessage;
import org.apache.geode.internal.cache.partitioned.FetchPartitionDetailsMessage.FetchPartitionDetailsResponse;
import org.apache.geode.internal.cache.partitioned.InternalPRInfo;
import org.apache.geode.internal.cache.partitioned.InternalPartitionDetails;
import org.apache.geode.internal.cache.partitioned.LoadProbe;
import org.apache.geode.internal.cache.partitioned.ManageBackupBucketMessage;
import org.apache.geode.internal.cache.partitioned.ManageBucketMessage;
import org.apache.geode.internal.cache.partitioned.ManageBucketMessage.NodeResponse;
import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails;
import org.apache.geode.internal.cache.partitioned.OfflineMemberDetailsImpl;
import org.apache.geode.internal.cache.partitioned.PRLoad;
import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOpFactory;
import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer;
import org.apache.geode.internal.cache.partitioned.RecoveryRunnable;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile;
import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector;
import org.apache.geode.internal.cache.partitioned.rebalance.FPRDirector;
import org.apache.geode.internal.cache.partitioned.rebalance.RebalanceDirector;
import org.apache.geode.internal.cache.persistence.MembershipFlushRequest;
import org.apache.geode.internal.cache.persistence.PersistentMemberID;
import org.apache.geode.internal.monitoring.ThreadsMonitoring;
import org.apache.geode.logging.internal.executors.LoggingThread;
import org.apache.geode.logging.internal.log4j.api.LogService;
* This class provides the redundancy management for partitioned region. It will provide the
* following to the PartitionedRegion:
* <ol>
* <li>Redundancy management at the time of bucket creation.
* <li>Redundancy management at the new node arrival.
* <li>Redundancy management when the node leaves the partitioned region distributed system
* gracefully. i.e. Cache.close()
* <li>Redundancy management at random node failure.
* </ol>
public class PRHARedundancyProvider {
private static final Logger logger = LogService.getLogger();
public static final String TIMEOUT_MSG =
"If your system has sufficient space, perhaps it is under membership or region creation stress?";
* Signature string indicating that not enough stores are available.
public static final String INSUFFICIENT_STORES_MSG =
"Advise you to start enough data store nodes";
private static final boolean DISABLE_CREATE_BUCKET_RANDOMNESS =
GEMFIRE_PREFIX + "partitionedRegionDatastoreDiscoveryTimeout";
* Signature string indicating that there are enough stores available.
private static final String SUFFICIENT_STORES_MSG = "Found a member to host a bucket.";
* string indicating the attempt to allocate a bucket
private static final String ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET =
"allocate enough members to host a new bucket";
private static final long INSUFFICIENT_LOGGING_THROTTLE_TIME = TimeUnit.SECONDS.toNanos(
Integer.getInteger(GEMFIRE_PREFIX + "InsufficientLoggingThrottleTime", 2));
private static final ThreadLocal<Boolean> forceLocalPrimaries = new ThreadLocal<>();
private static final AtomicLong insufficientLogTimeStamp = new AtomicLong(0);
private final AtomicBoolean firstInsufficientStoresLogged = new AtomicBoolean(false);
private final PartitionedRegion partitionedRegion;
private final InternalResourceManager resourceManager;
private final PartitionedRegionRebalanceOpFactory rebalanceOpFactory;
private final CompletableFuture<Void> providerStartupTask;
* An executor to submit tasks for redundancy recovery too. It makes sure that there will only be
* one redundancy recovery task in the queue at a time.
private final OneTaskOnlyExecutor recoveryExecutor;
private final Object shutdownLock = new Object();
private final BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction;
private volatile ScheduledFuture<?> recoveryFuture;
* Used to consolidate logging for bucket regions waiting on other members to come online.
private volatile PersistentBucketRecoverer persistentBucketRecoverer;
private boolean shutdown;
* Constructor for PRHARedundancyProvider.
* @param partitionedRegion The PartitionedRegion for which the HA redundancy is required to be
* managed.
public PRHARedundancyProvider(PartitionedRegion partitionedRegion,
InternalResourceManager resourceManager) {
this(partitionedRegion, resourceManager, PersistentBucketRecoverer::new,
new CompletableFuture<>());
PRHARedundancyProvider(PartitionedRegion partitionedRegion,
InternalResourceManager resourceManager,
BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction) {
this(partitionedRegion, resourceManager, persistentBucketRecovererFunction,
PartitionedRegionRebalanceOp::new, new CompletableFuture<>());
PRHARedundancyProvider(PartitionedRegion partitionedRegion,
InternalResourceManager resourceManager,
BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction,
PartitionedRegionRebalanceOpFactory rebalanceOpFactory,
CompletableFuture<Void> providerStartupTask) {
this.partitionedRegion = partitionedRegion;
this.resourceManager = resourceManager;
this.rebalanceOpFactory = rebalanceOpFactory;
this.providerStartupTask = providerStartupTask;
recoveryExecutor = new OneTaskOnlyExecutor(resourceManager.getExecutor(),
() -> InternalResourceManager.getResourceObserver().recoveryConflated(partitionedRegion),
this.persistentBucketRecovererFunction = persistentBucketRecovererFunction;
* Display bucket allocation status
* @param partitionedRegion the given region
* @param allStores the list of available stores. If null, unknown.
* @param alreadyUsed stores allocated; only used if allStores != null
* @param forLog true if the generated string is for a log message
* @return the description string
private static String regionStatus(PartitionedRegion partitionedRegion,
Collection<InternalDistributedMember> allStores,
Collection<InternalDistributedMember> alreadyUsed, boolean forLog) {
String newLine = forLog ? " " : lineSeparator();
String spaces = forLog ? "" : " ";
StringBuilder sb = new StringBuilder();
sb.append("Partitioned Region name = ");
if (allStores != null) {
sb.append("Redundancy level set to ");
sb.append(". Number of available data stores: ");
sb.append(". Number successfully allocated = ");
sb.append(". Data stores: ");
sb.append(". Data stores successfully allocated: ");
sb.append(". Equivalent members: ");
return sb.toString();
* Indicate a timeout due to excessive retries among available peers
* @param allStores all feasible stores. If null, we don't know.
* @param alreadyUsed those that have already accepted, only used if allStores != null
* @param opString description of the operation which timed out
public static void timedOut(PartitionedRegion partitionedRegion,
Set<InternalDistributedMember> allStores,
Collection<InternalDistributedMember> alreadyUsed, String opString, long timeOut) {
throw new PartitionedRegionStorageException(
format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.",
opString, regionStatus(partitionedRegion, allStores, alreadyUsed, true), timeOut)
public PartitionedRegion getPartitionedRegion() {
return partitionedRegion;
private Set<InternalDistributedMember> getAllStores(String partitionName) {
if (partitionName != null) {
return getFixedPartitionStores(partitionName);
final Set<InternalDistributedMember> allStores =
PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
if (localDataStore != null) {
return allStores;
* This is for FPR, for given partition, we have to return the set of datastores on which the
* given partition is defined
* @param partitionName name of the partition for which datastores need to be found out
private Set<InternalDistributedMember> getFixedPartitionStores(String partitionName) {
Set<InternalDistributedMember> members =
List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
if (allFixedPartitionAttributes != null) {
for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
if (fixedPartitionAttributes.getPartitionName().equals(partitionName)) {
return members;
* Indicate that we are unable to allocate sufficient stores and the timeout period has passed
* @param allStores stores we know about
* @param alreadyUsed ones already committed
* @param onlyLog true if only a warning log messages should be generated.
private void insufficientStores(Set<InternalDistributedMember> allStores,
Collection<InternalDistributedMember> alreadyUsed, boolean onlyLog) {
String regionStat = regionStatus(partitionedRegion, allStores, alreadyUsed, onlyLog);
String newLine = onlyLog ? " " : lineSeparator();
String notEnoughValidNodes = alreadyUsed.isEmpty()
? "Unable to find any members to host a bucket in the partitioned region. %s.%s"
: "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s";
if (onlyLog) {
logger.warn(format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG,
newLine + regionStat + newLine));
} else {
throw new PartitionedRegionStorageException(
format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine));
* Create a single copy of this bucket on one node. The bucket must already be locked.
* @param bucketId The bucket we are working on
* @param newBucketSize size to create it
* @param alreadyUsed members who already seem to have the bucket
* @param timeOut point at which to fail
* @param allStores the set of data stores to choose from
* @return the new member, null if it fails.
* @throws PartitionedRegionStorageException if there are not enough data stores
private InternalDistributedMember createBucketInstance(int bucketId, int newBucketSize,
Collection<InternalDistributedMember> excludedMembers,
Collection<InternalDistributedMember> alreadyUsed,
ArrayListWithClearState<InternalDistributedMember> failedMembers, long timeOut,
Set<InternalDistributedMember> allStores) {
boolean isDebugEnabled = logger.isDebugEnabled();
// Recalculate list of candidates
Set<InternalDistributedMember> candidateMembers = new HashSet<>(allStores);
if (isDebugEnabled) {
logger.debug("AllStores={} AlreadyUsed={} excluded={} failed={}", allStores, alreadyUsed,
excludedMembers, failedMembers);
if (candidateMembers.isEmpty()) {
// Run out of candidates. Refetch?
if (System.currentTimeMillis() > timeOut) {
if (isDebugEnabled) {
logger.debug("createBucketInstance: ran out of candidates and timed out");
// fail, let caller signal error
return null;
// Recalculate
candidateMembers = new HashSet<>(allStores);
if (isDebugEnabled) {
logger.debug("createBucketInstance: candidateMembers = {}", candidateMembers);
// If there are no candidates, early out.
if (candidateMembers.isEmpty()) {
// no options
if (isDebugEnabled) {
logger.debug("createBucketInstance: no valid candidates");
// failure
return null;
// In case of FixedPartitionedRegion, candidateMembers is the set of members on which
// required fixed partition is defined.
InternalDistributedMember candidate;
if (partitionedRegion.isFixedPartitionedRegion()) {
candidate = candidateMembers.iterator().next();
} else {
String colocatedWith =
if (colocatedWith != null) {
candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, colocatedWith);
} else {
Collection<InternalDistributedMember> orderedCandidates = new ArrayList<>(candidateMembers);
candidate = getPreferredDataStore(orderedCandidates, alreadyUsed);
if (candidate == null) {
return null;
if (!partitionedRegion.isShadowPR() && !checkMembersColocation(partitionedRegion, candidate)) {
if (isDebugEnabled) {
"createBucketInstances - Member does not have all of the regions colocated with partitionedRegion {}",
return null;
if (!candidate.equals(partitionedRegion.getMyId())) {
PartitionProfile profile =
if (profile == null) {
if (isDebugEnabled) {
logger.debug("createBucketInstance: {}: no partition profile for {}",
partitionedRegion.getFullPath(), candidate);
return null;
// Coordinate with any remote close occurring, causing it to wait until
// this create bucket attempt has been made.
ManageBucketRsp response =
createBucketOnMember(bucketId, candidate, newBucketSize, failedMembers.wasCleared());
// Add targetNode to bucketNodes if successful, else to failedNodeList
if (response.isAcceptance()) {
// success!
return candidate;
if (isDebugEnabled) {
logger.debug("createBucketInstance: {}: candidate {} declined to manage bucketId={}: {}",
partitionedRegion.getFullPath(), candidate,
if (response.equals(ManageBucketRsp.CLOSED)) {
} else {
return null;
InternalDistributedMember createBucketOnDataStore(int bucketId, int size,
RetryTimeKeeper retryTimeKeeper) {
boolean isDebugEnabled = logger.isDebugEnabled();
InternalDistributedMember primaryForFixedPartition = null;
if (partitionedRegion.isFixedPartitionedRegion()) {
primaryForFixedPartition =
InternalDistributedMember memberHostingBucket;
Collection<InternalDistributedMember> attempted = new HashSet<>();
do {
Set<InternalDistributedMember> available =
InternalDistributedMember targetMember = null;
for (InternalDistributedMember member : available) {
if (available.contains(primaryForFixedPartition)) {
targetMember = primaryForFixedPartition;
} else {
targetMember = member;
if (targetMember == null) {
if (shouldLogInsufficientStores()) {
insufficientStores(available, Collections.emptySet(), true);
// this will always throw an exception
insufficientStores(available, Collections.emptySet(), false);
try {
if (isDebugEnabled) {
logger.debug("Attempting to get data store {} to create the bucket {} for us",
CreateBucketMessage.NodeResponse response =
CreateBucketMessage.send(targetMember, partitionedRegion, bucketId, size);
memberHostingBucket = response.waitForResponse();
if (memberHostingBucket != null) {
return memberHostingBucket;
} catch (ForceReattemptException e) {
// do nothing, we will already check again for a primary.
} while ((memberHostingBucket =
partitionedRegion.getNodeForBucketWrite(bucketId, retryTimeKeeper)) == null);
return memberHostingBucket;
* Creates bucket atomically by creating all the copies to satisfy redundancy. In case all copies
* can not be created, a PartitionedRegionStorageException is thrown to the user and
* BucketBackupMessage is sent to the nodes to make copies of a bucket that was only partially
* created. Other VMs are informed of bucket creation through updates through their
* {@link BucketAdvisor.BucketProfile}s.
* <p>
* This method is synchronized to enforce a single threaded ordering, allowing for a more accurate
* picture of bucket distribution in the face of concurrency.
* <p>
* This method is now slightly misnamed. Another member could be in the process of creating this
* same bucket at the same time.
* @param bucketId Id of the bucket to be created.
* @param newBucketSize size of the first entry.
* @return the primary member for the newly created bucket
* @throws PartitionedRegionStorageException if required # of buckets can not be created to
* satisfy redundancy.
* @throws PartitionedRegionException if d-lock can not be acquired to create bucket.
* @throws PartitionOfflineException if persistent data recovery is not complete for a partitioned
* region referred to in the query.
public InternalDistributedMember createBucketAtomically(int bucketId, int newBucketSize,
boolean finishIncompleteCreation, String partitionName)
throws PartitionedRegionStorageException, PartitionedRegionException,
PartitionOfflineException {
boolean isDebugEnabled = logger.isDebugEnabled();
// If there are insufficient stores throw *before* we try acquiring the
// (very expensive) bucket lock or the (somewhat expensive) monitor on this
synchronized (this) {
if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
if (isDebugEnabled) {
logger.debug("Starting atomic creation of bucketId={}",
long timeOut = System.currentTimeMillis() + computeTimeout();
BucketMembershipObserver observer = null;
boolean needToElectPrimary = true;
InternalDistributedMember bucketPrimary = null;
try {
Bucket toCreate = partitionedRegion.getRegionAdvisor().getBucket(bucketId);
if (!finishIncompleteCreation) {
bucketPrimary = partitionedRegion.getBucketPrimary(bucketId);
if (bucketPrimary != null) {
if (isDebugEnabled) {
"during atomic creation, discovered that the primary already exists {} returning early",
needToElectPrimary = false;
return bucketPrimary;
observer = new BucketMembershipObserver(toCreate).beginMonitoring();
ArrayListWithClearState<InternalDistributedMember> failedMembers =
new ArrayListWithClearState<>();
Set<InternalDistributedMember> excludedMembers = new HashSet<>();
Collection<InternalDistributedMember> acceptedMembers = new ArrayList<>();
for (boolean loggedInsufficientStores = false;;) {
if (partitionedRegion.getCache().isCacheAtShutdownAll()) {
if (isDebugEnabled) {
logger.debug("Aborted createBucketAtomically due to ShutdownAll");
throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down");
long timeLeft = timeOut - System.currentTimeMillis();
if (timeLeft < 0) {
// It took too long.
timedOut(partitionedRegion, getAllStores(partitionName), acceptedMembers,
if (isDebugEnabled) {
logger.debug("createBucketAtomically: have {} ms left to finish this", timeLeft);
// Always go back to the advisor, see if any fresh data stores are present.
Set<InternalDistributedMember> allStores = getAllStores(partitionName);
loggedInsufficientStores = checkSufficientStores(allStores, loggedInsufficientStores);
InternalDistributedMember candidate = createBucketInstance(bucketId, newBucketSize,
excludedMembers, acceptedMembers, failedMembers, timeOut, allStores);
if (candidate != null) {
if (partitionedRegion.getDistributionManager().enforceUniqueZone()) {
// enforceUniqueZone property has no effect for a loner
if (!(partitionedRegion
.getDistributionManager() instanceof LonerDistributionManager)) {
Set<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores);
} else {
// log a warning if Loner
"enforce-unique-host and redundancy-zone properties have no effect for a LonerDistributedSystem.");
// Get an updated list of bucket owners, which should include
// buckets created concurrently with this createBucketAtomically call
acceptedMembers = partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId);
if (isDebugEnabled) {
logger.debug("Accepted members: {}", acceptedMembers);
// set the primary as the candidate in the first iteration if the candidate has accepted
if (bucketPrimary == null && acceptedMembers.contains(candidate)) {
bucketPrimary = candidate;
// prune out the stores that have left
verifyBucketNodes(excludedMembers, partitionName);
// Note - we used to wait for the created bucket to become primary here
// if this is a colocated region. We no longer need to do that, because
// the EndBucketMessage is sent out after bucket creation completes to
// select the primary.
// Have we exhausted all candidates?
int potentialCandidateCount = allStores.size()
- (excludedMembers.size() + acceptedMembers.size() + failedMembers.size());
// Determining exhausted members competes with bucket balancing; it's
// important to re-visit all failed members since "failed" set may
// contain datastores which at the moment are imbalanced, but yet could
// be candidates. If the failed members list is empty, its expected
// that the next iteration clears the (already empty) list.
boolean exhaustedPotentialCandidates =
failedMembers.wasCleared() && potentialCandidateCount <= 0;
boolean redundancySatisfied =
acceptedMembers.size() > partitionedRegion.getRedundantCopies();
boolean bucketNotCreated = acceptedMembers.isEmpty();
if (isDebugEnabled) {
"potentialCandidateCount={}, exhaustedPotentialCandidates={}, redundancySatisfied={}, bucketNotCreated={}",
potentialCandidateCount, exhaustedPotentialCandidates, redundancySatisfied,
if (bucketNotCreated) {
// if we haven't managed to create the bucket on any nodes, retry.
if (exhaustedPotentialCandidates && !redundancySatisfied) {
insufficientStores(allStores, acceptedMembers, true);
// Allow the thread to potentially finish bucket creation even if redundancy was not met.
if (redundancySatisfied || exhaustedPotentialCandidates) {
// Tell one of the members to become primary.
// The rest of the members will be allowed to volunteer for primary.
endBucketCreation(bucketId, acceptedMembers, bucketPrimary, partitionName);
int expectedRemoteHosts = acceptedMembers.size()
- (acceptedMembers.contains(partitionedRegion.getMyId()) ? 1 : 0);
boolean interrupted = Thread.interrupted();
try {
BucketMembershipObserverResults results = observer
.waitForOwnersGetPrimary(expectedRemoteHosts, acceptedMembers, partitionName);
if (results.problematicDeparture) {
// BZZZT! Member left. Start over.
bucketPrimary = results.primary;
} catch (InterruptedException e) {
interrupted = true;
} finally {
if (interrupted) {
needToElectPrimary = false;
return bucketPrimary;
} catch (DiskAccessException dae) {
CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion();
if (cancelCriterion.isCancelInProgress()) {
needToElectPrimary = false;
throw dae;
} catch (CancelException | RegionDestroyedException e) {
// We don't need to elect a primary if the cache was closed. The other members will
// take care of it. This ensures we don't compromise redundancy.
needToElectPrimary = false;
throw e;
} catch (PartitionOfflineException e) {
throw e;
} catch (RuntimeException e) {
if (isDebugEnabled) {
logger.debug("Unable to create new bucket {}: {}", bucketId, e.getMessage(), e);
if (!finishIncompleteCreation) {
throw e;
} finally {
if (observer != null) {
// Try to make sure everyone that created the bucket can volunteer for primary
if (needToElectPrimary) {
try {
bucketPrimary, partitionName);
} catch (Exception e) {
// if region is going down, then no warning level logs
if (e instanceof CancelException
|| partitionedRegion.getCancelCriterion().isCancelInProgress()) {
logger.debug("Exception trying choose a primary after bucket creation failure", e);
} else {
logger.warn("Exception trying choose a primary after bucket creation failure", e);
* Figure out which member should be primary for a bucket among the members that have created the
* bucket, and tell that member to become the primary.
* @param acceptedMembers The members that now host the bucket
private void endBucketCreation(int bucketId,
Collection<InternalDistributedMember> acceptedMembers,
InternalDistributedMember targetPrimary, String partitionName) {
if (acceptedMembers.isEmpty()) {
acceptedMembers = new HashSet<>(acceptedMembers);
// This is for FPR, for a given bucket id , make sure that for given bucket
// id , only the datastore on which primary partition is defined for this
// bucket becomes the primary. If primary partition is not available then
// secondary partition will become primary
if (partitionName != null) {
if (isLocalPrimary(partitionName)) {
targetPrimary = partitionedRegion.getMyId();
} else {
targetPrimary =
if (targetPrimary == null) {
Set<InternalDistributedMember> fpDataStores = getFixedPartitionStores(partitionName);
targetPrimary = fpDataStores.iterator().next();
if (targetPrimary == null) {
// we need to select the same primary as chosen earlier (e.g.
// the parent's in case of colocation) so it is now passed
targetPrimary =
getPreferredDataStore(acceptedMembers, Collections.emptySet());
boolean isHosting = acceptedMembers.remove(partitionedRegion.getDistributionManager().getId());
EndBucketCreationMessage.send(acceptedMembers, targetPrimary, partitionedRegion, bucketId);
if (isHosting) {
endBucketCreationLocally(bucketId, targetPrimary);
private boolean isLocalPrimary(String partitionName) {
List<FixedPartitionAttributesImpl> allFixedPartitionAttributes =
if (allFixedPartitionAttributes != null) {
for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) {
if (fixedPartitionAttributes.getPartitionName().equals(partitionName)
&& fixedPartitionAttributes.isPrimary()) {
return true;
return false;
public void endBucketCreationLocally(int bucketId, InternalDistributedMember newPrimary) {
// Don't elect ourselves as primary or tell others to persist our ID
// if this member has been destroyed.
if (partitionedRegion.getCancelCriterion().isCancelInProgress()
|| partitionedRegion.isDestroyed()) {
if (logger.isDebugEnabled()) {
logger.debug("endBucketCreationLocally: for region {} bucketId={} new primary: {}",
partitionedRegion.getFullPath(), bucketId, newPrimary);
BucketAdvisor bucketAdvisor = partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucketId);
ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion();
BucketPersistenceAdvisor persistentAdvisor = proxyBucketRegion.getPersistenceAdvisor();
// prevent multiple threads from ending bucket creation at the same time.
synchronized (proxyBucketRegion) {
if (persistentAdvisor != null) {
BucketRegion realBucket = proxyBucketRegion.getCreatedBucketRegion();
if (realBucket != null) {
PersistentMemberID persistentId = realBucket.getPersistentID();
// We've received an endBucketCreationMessage, but the primary
// may not have. So now we wait for the chosen member to become primary.
if (partitionedRegion.getGemFireCache().getMyId().equals(newPrimary)) {
// If we're the chosen primary, volunteer for primary now
if (bucketAdvisor.isHosting()) {
} else {
// It's possible the chosen primary has already left. In
// that case, volunteer for primary now.
if (!bucketAdvisor.adviseInitialized().contains(newPrimary)) {
// If the bucket has had a primary, that means the chosen bucket was primary for a while.
// Go ahead and clear the primary elector field.
if (bucketAdvisor.getHadPrimary()) {
// send out a profile update to indicate the persistence is initialized, if needed.
if (persistentAdvisor != null) {
List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions(
for (PartitionedRegion child : colocatedWithList) {
if (child.getRegionAdvisor().isBucketLocal(bucketId)) {
child.getRedundancyProvider().endBucketCreationLocally(bucketId, newPrimary);
* Get buddy data stores on the same Host as the accepted member
* @return set of members on the same host, not including accepted member
* @since GemFire 5.9
private Set<InternalDistributedMember> getBuddyMembersInZone(
InternalDistributedMember acceptedMember, Collection<InternalDistributedMember> allStores) {
DistributionManager dm = partitionedRegion.getDistributionManager();
Set<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember);
return buddies;
* Early check for resources. This code may be executed for every put operation if there are no
* datastores present, limit excessive logging.
* @since GemFire 5.8
private void earlySufficientStoresCheck(String partitionName) {
assert Assert.assertHoldsLock(this, false);
Set<InternalDistributedMember> currentStores = getAllStores(partitionName);
if (currentStores.isEmpty()) {
if (shouldLogInsufficientStores()) {
insufficientStores(currentStores, emptyList(), true);
insufficientStores(currentStores, emptyList(), false);
* Limit the frequency for logging the {@link #INSUFFICIENT_STORES_MSG} message to once per PR
* after which once every {@link #INSUFFICIENT_LOGGING_THROTTLE_TIME} second
* @return true if it's time to log
* @since GemFire 5.8
private boolean shouldLogInsufficientStores() {
long now = NanoTimer.getTime();
long delta = now - insufficientLogTimeStamp.get();
if (firstInsufficientStoresLogged.compareAndSet(false, true)
return true;
return false;
* Compute timeout for waiting for a bucket. Prefer
* {@link PartitionedRegion#getRetryTimeout()}
* @return the milliseconds to wait for a bucket creation operation
private long computeTimeout() {
// only positive values allowed
if (millis > 0) {
return millis;
return partitionedRegion.getRetryTimeout();
* Check to determine that there are enough datastore VMs to start the bucket creation processes.
* Log a warning or throw an exception indicating when there are not enough datastore VMs.
* @param allStores All known data store instances (including local)
* @param loggedInsufficientStores indicates whether a warning has been logged
* @return true when a warning has been logged, false if a warning should be logged.
private boolean checkSufficientStores(Set<InternalDistributedMember> allStores,
boolean loggedInsufficientStores) {
// Report (only once) if insufficient data store have been detected.
if (!loggedInsufficientStores) {
if (allStores.isEmpty()) {
insufficientStores(allStores, emptyList(), true);
return true;
} else {
if (!allStores.isEmpty()) {
// Excellent, sufficient resources were found!"{} Region name, {}", SUFFICIENT_STORES_MSG, partitionedRegion.getFullPath());
return false;
// Already logged warning, there are no datastores
insufficientStores(allStores, emptyList(), false);
return loggedInsufficientStores;
* Clean up locally created bucket and tell other VMs to attempt recovering redundancy
* @param bucketId the bucket identifier
private void cleanUpBucket(int bucketId) {
Set<InternalDistributedMember> dataStores =
BucketBackupMessage.send(dataStores, partitionedRegion, bucketId);
public void finishIncompleteBucketCreation(int bucketId) {
String partitionName = null;
if (partitionedRegion.isFixedPartitionedRegion()) {
FixedPartitionAttributesImpl fpa =
PartitionedRegionHelper.getFixedPartitionAttributesForBucket(partitionedRegion, bucketId);
partitionName = fpa.getPartitionName();
createBucketAtomically(bucketId, 0, true, partitionName);
* Creates bucket with ID bucketId on targetNode. This method will also create the bucket for all
* of the child colocated PRs.
* @param isRebalance true if bucket creation is directed by rebalancing
* @return true if the bucket was sucessfully created
public boolean createBackupBucketOnMember(int bucketId, InternalDistributedMember targetMember,
boolean isRebalance, boolean replaceOfflineData, InternalDistributedMember fromMember,
boolean forceCreation) {
if (logger.isDebugEnabled()) {
logger.debug("createBackupBucketOnMember for bucketId={} member: {}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember);
if (!targetMember.equals(partitionedRegion.getMyId())) {
PartitionProfile profile =
if (profile == null) {
return false;
try {
ManageBackupBucketMessage.NodeResponse response =
ManageBackupBucketMessage.send(targetMember, partitionedRegion, bucketId, isRebalance,
replaceOfflineData, fromMember, forceCreation);
if (response.waitForAcceptance()) {
if (logger.isDebugEnabled()) {
"createBackupBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember);
return true;
if (logger.isDebugEnabled()) {
"createBackupBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember);
return false;
} catch (VirtualMachineError err) {
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
if (e instanceof ForceReattemptException) {
// no log needed
} else if (e instanceof CancelException
|| e.getCause() != null && e.getCause() instanceof CancelException) {
// no need to log exceptions caused by cache closure
} else {
logger.warn("Exception creating partition on {}", targetMember, e);
return false;
PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
boolean bucketManaged =
dataStore != null && dataStore.grabBucket(bucketId, fromMember, forceCreation,
replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED);
if (!bucketManaged) {
if (logger.isDebugEnabled()) {
"createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} dataStore={}",
partitionedRegion.bucketStringForLogs(bucketId), dataStore);
return bucketManaged;
private boolean getForceLocalPrimaries() {
boolean result = false;
Boolean forceLocalPrimariesValue = forceLocalPrimaries.get();
if (forceLocalPrimariesValue != null) {
result = forceLocalPrimariesValue;
return result;
* Creates bucket with ID bucketId on targetNode.
* @param forceCreation inform the targetMember it must attempt host the bucket, appropriately
* ignoring it's maximums
* @return a response object
private ManageBucketRsp createBucketOnMember(int bucketId, InternalDistributedMember targetMember,
int newBucketSize, boolean forceCreation) {
if (logger.isDebugEnabled()) {
logger.debug("createBucketOnMember for bucketId={} member: {}{}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember,
forceCreation ? " forced" : "");
if (!targetMember.equals(partitionedRegion.getMyId())) {
PartitionProfile profile =
if (profile == null) {
return ManageBucketRsp.NO;
try {
NodeResponse response = ManageBucketMessage.send(targetMember, partitionedRegion, bucketId,
newBucketSize, forceCreation);
if (response.waitForAcceptance()) {
if (logger.isDebugEnabled()) {
"createBucketOnMember: Bucket creation succeed for bucketId={} on member = {}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember);
return ManageBucketRsp.YES;
if (logger.isDebugEnabled()) {
"createBucketOnMember: Bucket creation failed for bucketId={} on member = {}",
partitionedRegion.bucketStringForLogs(bucketId), targetMember);
return response.rejectedDueToInitialization() ? ManageBucketRsp.NO_INITIALIZING
: ManageBucketRsp.NO;
} catch (PartitionOfflineException e) {
throw e;
} catch (VirtualMachineError err) {
// If this ever returns, rethrow the error. We're poisoned
// now, so don't let this thread continue.
throw err;
} catch (Throwable e) {
// Whenever you catch Error or Throwable, you must also
// catch VirtualMachineError (see above). However, there is
// _still_ a possibility that you are dealing with a cascading
// error condition, so you also need to check to see if the JVM
// is still usable:
if (e instanceof CancelException
|| e.getCause() != null && e.getCause() instanceof CancelException) {
// no need to log exceptions caused by cache closure
return ManageBucketRsp.CLOSED;
if (e instanceof ForceReattemptException) {
// no log needed
} else {
logger.warn("Exception creating partition on {}", targetMember, e);
return ManageBucketRsp.NO;
PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore();
boolean bucketManaged = dataStore != null && dataStore.handleManageBucketRequest(bucketId,
newBucketSize, partitionedRegion.getMyId(), forceCreation);
if (!bucketManaged) {
if (logger.isDebugEnabled()) {
"createBucketOnMember: Local data store not able to accommodate the data for bucketId={}",
return ManageBucketRsp.valueOf(bucketManaged);
* Select the member with which is hosting the same bucketid for the PR it is colocated with In
* case of primary it returns the same node whereas in case of secondary it will return the least
* loaded datastore which is hosting the bucketid.
* @return InternalDistributedMember colocated data store
* @since GemFire 5.8Beta
private InternalDistributedMember getColocatedDataStore(
Collection<InternalDistributedMember> candidates,
Collection<InternalDistributedMember> alreadyUsed, int bucketId, String prName) {
Assert.assertTrue(prName != null);
PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(partitionedRegion);
Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache());
PartitionRegionConfig config =
(PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier());
if (!config.isColocationComplete()) {
throw new IllegalStateException(
"Cannot create buckets, as colocated regions are not configured to be at the same nodes.");
RegionAdvisor advisor = colocatedRegion.getRegionAdvisor();
if (alreadyUsed.isEmpty()) {
InternalDistributedMember primary = advisor.getPrimaryMemberForBucket(bucketId);
if (!candidates.contains(primary)) {
return null;
return primary;
Set<InternalDistributedMember> bucketOwnersSet = advisor.getBucketOwners(bucketId);
Collection<InternalDistributedMember> members = new ArrayList<>(bucketOwnersSet);
if (members.isEmpty()) {
return null;
return getPreferredDataStore(members, alreadyUsed);
* Select the member with the fewest buckets, among those with the fewest randomly select one.
* Under concurrent access, the data that this method uses, may be somewhat volatile, note that
* createBucketAtomically synchronizes to enhance the consistency of the data used in this method.
* @param candidates collection of InternalDistributedMember, potential datastores
* @param alreadyUsed data stores already in use
* @return a member with the fewest buckets or null if no datastores
private InternalDistributedMember getPreferredDataStore(
Collection<InternalDistributedMember> candidates,
Collection<InternalDistributedMember> alreadyUsed) {
// has a primary already been chosen?
boolean forPrimary = alreadyUsed.isEmpty();
if (forPrimary && getForceLocalPrimaries()) {
PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
if (localDataStore != null) {
return partitionedRegion.getMyId();
if (candidates.size() == 1) {
return candidates.iterator().next();
Assert.assertTrue(candidates.size() > 1);
// Convert peers to DataStoreBuckets
List<DataStoreBuckets> stores = partitionedRegion.getRegionAdvisor()
.adviseFilteredDataStores(new HashSet<>(candidates));
DistributionManager dm = partitionedRegion.getDistributionManager();
// Add local member as a candidate, if appropriate
InternalDistributedMember localMember = dm.getId();
PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore();
if (localDataStore != null && candidates.contains(localMember)) {
int bucketCount = localDataStore.getBucketsManaged();
int priCount = localDataStore.getNumberOfPrimaryBucketsManaged();
int localMaxMemory = partitionedRegion.getLocalMaxMemory();
stores.add(new DataStoreBuckets(localMember, bucketCount, priCount, localMaxMemory));
if (stores.isEmpty()) {
return null;
// ---------------------------------------------
// Calculate all hosts who already have this bucket
Collection<InternalDistributedMember> existingHosts = new HashSet<>();
for (InternalDistributedMember mem : alreadyUsed) {
Comparator<DataStoreBuckets> comparator = (d1, d2) -> {
boolean host1Used = existingHosts.contains(d1.memberId());
boolean host2Used = existingHosts.contains(d2.memberId());
if (!host1Used && host2Used) {
// host1 preferred
return -1;
if (host1Used && !host2Used) {
// host2 preferred
return 1;
// Look for least loaded
float load1;
float load2;
if (forPrimary) {
load1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB();
load2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB();
} else {
load1 = d1.numBuckets() / (float) d1.localMaxMemoryMB();
load2 = d2.numBuckets() / (float) d2.localMaxMemoryMB();
int result =, load2);
if (result == 0) {
// if they have the same load, choose the member with the higher localMaxMemory
result = d2.localMaxMemoryMB() - d1.localMaxMemoryMB();
return result;
// ---------------------------------------------
// First step is to sort datastores first by those whose hosts don't
// hold this bucket, and then secondarily by loading.
if (logger.isDebugEnabled()) {
logger.debug(fancyFormatBucketAllocation("Sorted ", stores, existingHosts));
// ---------------------------------------------
// Always add the first datastore and note just how good it is.
DataStoreBuckets bestDataStore = stores.get(0);
List<DataStoreBuckets> bestStores = new ArrayList<>();
boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId());
// ---------------------------------------------
// Collect all of the other hosts in this sorted list that are as good as the very first one.
for (int i = 1; i < stores.size(); i++) {
DataStoreBuckets aDataStore = stores.get(i);
if (!allStoresInUse && alreadyUsed.contains(aDataStore.memberId())) {
// Only choose between the ones not in use.
if (, aDataStore) != 0) {
if (logger.isDebugEnabled()) {
logger.debug(fancyFormatBucketAllocation("Best Stores ", bestStores, existingHosts));
// ---------------------------------------------
int chosen;
chosen = 0;
} else {
// Pick one (at random)
chosen = PartitionedRegion.RANDOM.nextInt(bestStores.size());
return bestStores.get(chosen).memberId();
* Adds a membership listener to watch for member departures, and schedules a task to recover
* redundancy of existing buckets
void startRedundancyRecovery() {
partitionedRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener());
* Log bucket allocation in the log files in this format:
* <pre>
* member1: +5/20
* member2: -10/5
* </pre>
* <p>
* After the member name, the +/- indicates whether or not this bucket is already hosted on the
* given member. This is followed by the number of hosted primaries followed by the number of
* hosted non-primary buckets.
* @param prefix first part of message to print
* @param dataStores list of stores
* @param existingStores to mark those already in use
private String fancyFormatBucketAllocation(String prefix, Iterable<DataStoreBuckets> dataStores,
Collection<InternalDistributedMember> existingStores) {
StringBuilder logStr = new StringBuilder();
if (prefix != null) {
logStr.append("Bucket Allocation for prId=").append(partitionedRegion.getPRId()).append(":");
for (Object dataStore : dataStores) {
DataStoreBuckets buckets = (DataStoreBuckets) dataStore;
logStr.append(buckets.memberId()).append(": ");
if (existingStores.contains(buckets.memberId())) {
} else {
logStr.append(buckets.numBuckets() - buckets.numPrimaries());
return logStr.toString();
* Verifies the members and removes the members that are either not present in the
* DistributedSystem or are no longer part of the PartitionedRegion (close/localDestroy has been
* performed.) .
* @param members collection of members to scan and modify
private void verifyBucketNodes(Collection<InternalDistributedMember> members,
String partitionName) {
if (members == null || members.isEmpty()) {
// Revisit region advisor, get current bucket stores.
Set<InternalDistributedMember> availableMembers = getAllStores(partitionName);
for (Iterator<InternalDistributedMember> iterator = members.iterator(); iterator.hasNext();) {
InternalDistributedMember node =;
if (!availableMembers.contains(node)) {
if (logger.isDebugEnabled()) {
logger.debug("verifyBucketNodes: removing member {}", node);
Assert.assertTrue(!members.contains(node), "return value does not contain " + node);
* Schedule a task to perform redundancy recovery for a new node or for the node departed.
private void scheduleRedundancyRecovery(Object failedMemberId) {
boolean isStartup = failedMemberId == null;
long delay;
boolean movePrimaries;
if (isStartup) {
delay = partitionedRegion.getPartitionAttributes().getStartupRecoveryDelay();
movePrimaries = !Boolean
} else {
delay = partitionedRegion.getPartitionAttributes().getRecoveryDelay();
movePrimaries = false;
final boolean requiresRedundancyRecovery = delay >= 0;
if (!requiresRedundancyRecovery) {
if (!partitionedRegion.isDataStore()) {
Runnable task = new RecoveryRunnable(this) {
public void run2() {
try {
boolean isFixedPartitionedRegion = partitionedRegion.isFixedPartitionedRegion();
// always replace offline data for fixed partitioned regions -
// this guarantees we create the buckets we are supposed to create on this node.
boolean replaceOfflineData = isFixedPartitionedRegion || !isStartup;
RebalanceDirector director;
if (isFixedPartitionedRegion) {
director = new FPRDirector(true, movePrimaries);
} else {
director = new CompositeDirector(true, true, false, movePrimaries);
PartitionedRegionRebalanceOp rebalance = rebalanceOpFactory.create(
partitionedRegion, false, director, replaceOfflineData, false);
long start = partitionedRegion.getPrStats().startRecovery();
if (isFixedPartitionedRegion) {
} else {
recoveryFuture = null;
} catch (CancelException e) {
logger.debug("Cache closed while recovery in progress");
} catch (RegionDestroyedException e) {
logger.debug("Region destroyed while recovery in progress");
} catch (Exception e) {
logger.error("Unexpected exception during bucket recovery", e);
synchronized (shutdownLock) {
if (!shutdown) {
try {
if (logger.isDebugEnabled()) {
if (isStartup) {
logger.debug("{} scheduling redundancy recovery in {} ms", partitionedRegion, delay);
} else {
"partitionedRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms",
failedMemberId, delay);
recoveryFuture = recoveryExecutor.schedule(task, delay, MILLISECONDS);
} catch (RejectedExecutionException e) {
// ok, the executor is shutting down.
public boolean isRedundancyImpaired() {
int numBuckets = partitionedRegion.getPartitionAttributes().getTotalNumBuckets();
int targetRedundancy = partitionedRegion.getPartitionAttributes().getRedundantCopies();
for (int i = 0; i < numBuckets; i++) {
int redundancy = partitionedRegion.getRegionAdvisor().getBucketRedundancy(i);
if (redundancy < targetRedundancy && redundancy != -1 || redundancy > targetRedundancy) {
return true;
return false;
boolean recoverPersistentBuckets() {
* To handle a case where ParallelGatewaySender is persistent but userPR is not First recover
* the GatewaySender buckets for ParallelGatewaySender irrespective of whether colocation is
* complete or not.
PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion);
// Check if the leader region or some child shadow PR region is persistent
// and return the first persistent region found
PartitionedRegion persistentLeader = getPersistentLeader();
// If there is no persistent region in the colocation chain, no need to recover.
if (persistentLeader == null) {
return true;
if (!checkMembersColocation(leaderRegion, leaderRegion.getDistributionManager().getId())) {
if (logger.isDebugEnabled()) {
logger.debug("Skipping persistent recovery of {} because colocation is not complete for {}",
partitionedRegion, leaderRegion);
return false;
ProxyBucketRegion[] proxyBucketArray =
if (proxyBucketArray.length == 0) {
throw new IllegalStateException("Unexpected empty proxy bucket array");
for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
Set<InternalDistributedMember> peers = partitionedRegion.getRegionAdvisor().adviseGeneric();
// We need to make sure here that we don't run into this race condition:
// 1) We get a membership view from member A
// 2) Member B removes itself, and distributes to us and A. We don't remove B
// 3) We apply the membership view from A, which includes B.
// That will add B back into the set.
// This state flush will make sure that any membership changes
// That are in progress on the peers are finished.
MembershipFlushRequest.send(peers, partitionedRegion.getDistributionManager(),
List<ProxyBucketRegion> bucketsNotHostedLocally = new ArrayList<>(proxyBucketArray.length);
List<ProxyBucketRegion> bucketsHostedLocally = new ArrayList<>(proxyBucketArray.length);
* Spawn a separate thread for bucket that we previously hosted to recover that bucket.
* That thread will get to the point at which it has determined that at least one member
* (possibly the local member) has fully initialized the bucket, at which it will count down the
* someMemberRecoveredLatch latch on the bucket.
* Once at least one copy of each bucket has been created in the distributed system, the
* initPRInternals method will exit. Some of the threads spawned here will still be doing GII's
* in the background. This allows the system to become usable as fast as possible.
* If we used a bounded thread pool here, we end up waiting for some buckets to finish there GII
* before returning from initPRInternals. In the future maybe we could let the create bucket
* return and pass the GII task to a separate thread pool.
for (ProxyBucketRegion proxyBucket : proxyBucketArray) {
if (proxyBucket.getPersistenceAdvisor().wasHosting()) {
RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) {
public void run() {
// make sure that we always count down this latch, even if the region was destroyed.
try {;
} finally {
if (getPersistentBucketRecoverer() != null) {
public void run2() {
Thread recoveryThread =
new LoggingThread("Recovery thread for bucket " + proxyBucket.getName(),
false, recoveryRunnable);
} else {
try {
// try to recover the local buckets before the proxy buckets. This will allow us to detect
// any ConflictingDataException before the proxy buckets update their membership view.
for (ProxyBucketRegion proxyBucket : bucketsHostedLocally) {
if (!partitionedRegion.isInternalRegion() && !bucketsNotHostedLocally.isEmpty()) {
for (ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) {
} finally {
if (getPersistentBucketRecoverer() != null) {
return true;
void createPersistentBucketRecoverer(int proxyBuckets) {
persistentBucketRecoverer = persistentBucketRecovererFunction.apply(this, proxyBuckets);
PersistentBucketRecoverer getPersistentBucketRecoverer() {
return persistentBucketRecoverer;
* Check to see if any colocated region of the current region is persistent. It's not enough to
* check just the leader region, because a child region might be a persistent parallel WAN queue,
* which is allowed.
* @return the most senior region in the colocation chain (closest to the leader) that is
* persistent.
private PartitionedRegion getPersistentLeader() {
PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion);
return findPersistentRegionRecursively(leader);
private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion partitionedRegion) {
if (partitionedRegion.getDataPolicy().withPersistence()) {
return partitionedRegion;
for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(partitionedRegion)) {
PartitionedRegion leader = findPersistentRegionRecursively(child);
if (leader != null) {
return leader;
return null;
void scheduleCreateMissingBuckets() {
if (partitionedRegion.getColocatedWith() != null) {
Runnable task = new CreateMissingBucketsTask(this);
final InternalResourceManager resourceManager =
public void shutdown() {
synchronized (shutdownLock) {
shutdown = true;
ScheduledFuture<?> recoveryFuture = this.recoveryFuture;
if (recoveryFuture != null) {
* Creates and fills in a PartitionMemberDetails for the partitioned region.
* @param internal true if internal-only details should be included
* @param loadProbe the LoadProbe to use
* @return PartitionRegionInfo for the partitioned region
public InternalPRInfo buildPartitionedRegionInfo(boolean internal, LoadProbe loadProbe) {
PartitionedRegion pr = partitionedRegion;
if (pr == null) {
return null;
int configuredBucketCount = pr.getTotalNumberOfBuckets();
int createdBucketCount = pr.getRegionAdvisor().getCreatedBucketsCount();
int lowRedundancyBucketCount = pr.getRedundancyTracker().getLowRedundancyBuckets();
int configuredRedundantCopies = pr.getRedundantCopies();
int actualRedundantCopies = pr.getRedundancyTracker().getActualRedundancy();
PartitionedRegionDataStore dataStore = pr.getDataStore();
Set<InternalDistributedMember> datastores = pr.getRegionAdvisor().adviseDataStore();
Set<InternalPartitionDetails> memberDetails = new TreeSet<>();
OfflineMemberDetails offlineMembers = null;
boolean fetchOfflineMembers = false;
if (dataStore != null) {
memberDetails.add(buildPartitionMemberDetails(internal, loadProbe));
offlineMembers = fetchOfflineMembers();
} else {
fetchOfflineMembers = true;
// Get remote results
if (!datastores.isEmpty()) {
FetchPartitionDetailsResponse response = FetchPartitionDetailsMessage.send(datastores, pr,
internal, fetchOfflineMembers, loadProbe);
if (fetchOfflineMembers) {
offlineMembers = response.getOfflineMembers();
String colocatedWithPath = pr.getColocatedWith();
return new PartitionRegionInfoImpl(pr.getFullPath(), configuredBucketCount,
createdBucketCount, lowRedundancyBucketCount, configuredRedundantCopies,
actualRedundantCopies, memberDetails, colocatedWithPath, offlineMembers);
* Retrieve the set of members which are currently offline for all buckets.
public OfflineMemberDetailsImpl fetchOfflineMembers() {
ProxyBucketRegion[] proxyBuckets = partitionedRegion.getRegionAdvisor().getProxyBucketArray();
Set<PersistentMemberID>[] offlineMembers = new Set[proxyBuckets.length];
for (int i = 0; i < proxyBuckets.length; i++) {
ProxyBucketRegion proxyBucket = proxyBuckets[i];
if (partitionedRegion.getDataPolicy().withPersistence()) {
Set<PersistentMemberID> persistedMembers =
if (persistedMembers == null) {
persistedMembers = Collections.emptySet();
offlineMembers[i] = persistedMembers;
} else {
offlineMembers[i] = Collections.emptySet();
return new OfflineMemberDetailsImpl(offlineMembers);
* Creates and fills in a PartitionMemberDetails for the local member.
* @param internal true if internal-only details should be included
* @param loadProbe the LoadProbe to use
* @return PartitionMemberDetails for the local member
public InternalPartitionDetails buildPartitionMemberDetails(boolean internal,
LoadProbe loadProbe) {
final PartitionedRegion pr = partitionedRegion;
PartitionedRegionDataStore dataStore = pr.getDataStore();
if (dataStore == null) {
return null;
long size = 0;
InternalDistributedMember localMember = pr.getMyId();
int configuredBucketCount = pr.getTotalNumberOfBuckets();
long[] bucketSizes = new long[configuredBucketCount];
// key: bid, value: size
Map<Integer, Integer> bucketSizeMap = dataStore.getSizeLocally();
for (Map.Entry<Integer, Integer> me : bucketSizeMap.entrySet()) {
int bid = me.getKey();
long bucketSize = dataStore.getBucketSize(bid);
bucketSizes[bid] = bucketSize;
size += bucketSize;
InternalPartitionDetails localDetails;
if (internal) {
PRLoad prLoad = loadProbe.getLoad(pr);
localDetails =
new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged(), prLoad,
} else {
localDetails =
new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size,
dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged());
return localDetails;
* Wait for all persistent buckets to be recovered from disk, or for the region to be closed,
* whichever happens first.
private void waitForPersistentBucketRecoveryOrClose() {
if (getPersistentBucketRecoverer() != null) {
List<PartitionedRegion> colocatedRegions =
for (PartitionedRegion child : colocatedRegions) {
* Wait for all persistent buckets to be recovered from disk, regardless of whether the region is
* currently being closed.
void waitForPersistentBucketRecovery() {
if (getPersistentBucketRecoverer() != null) {
public boolean isPersistentRecoveryComplete() {
if (!checkMembersColocation(partitionedRegion, partitionedRegion.getMyId())) {
return false;
if (getPersistentBucketRecoverer() != null
&& !getPersistentBucketRecoverer().hasRecoveryCompleted()) {
return false;
Map<String, PartitionedRegion> colocatedRegions =
for (PartitionedRegion region : colocatedRegions.values()) {
PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider();
if (redundancyProvider.getPersistentBucketRecoverer() != null &&
!redundancyProvider.getPersistentBucketRecoverer().hasRecoveryCompleted()) {
return false;
return true;
private ThreadsMonitoring getThreadMonitorObj() {
DistributionManager distributionManager = partitionedRegion.getDistributionManager();
if (distributionManager != null) {
return distributionManager.getThreadMonitoring();
return null;
* Monitors distributed membership for a given bucket
private class BucketMembershipObserver implements MembershipListener {
private final Bucket bucketToMonitor;
private final AtomicInteger arrivals = new AtomicInteger(0);
private final AtomicBoolean departures = new AtomicBoolean(false);
private BucketMembershipObserver(Bucket bucketToMonitor) {
this.bucketToMonitor = bucketToMonitor;
private BucketMembershipObserver beginMonitoring() {
int profilesPresent = bucketToMonitor.getBucketAdvisor()
return this;
private void stopMonitoring() {
public void memberJoined(DistributionManager distributionManager,
InternalDistributedMember id) {
if (logger.isDebugEnabled()) {
logger.debug("Observer for bucket {} member joined {}", bucketToMonitor, id);
synchronized (this) {
public void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
if (logger.isDebugEnabled()) {
logger.debug("Observer for bucket {} member departed {}", bucketToMonitor, id);
synchronized (this) {
* Wait for expected number of owners to be recognized. When the expected number have been seen,
* then fetch the primary and report it. If while waiting for the owners to be recognized there
* is a departure which compromises redundancy
* @param expectedCount the number of bucket owners to wait for
* @param expectedOwners the list of owners used when a departure is detected
* @return if no problematic departures are detected, the primary
private BucketMembershipObserverResults waitForOwnersGetPrimary(int expectedCount,
Collection<InternalDistributedMember> expectedOwners, String partitionName)
throws InterruptedException {
boolean problematicDeparture = false;
synchronized (this) {
while (true) {
// If any departures, need to rethink much...
boolean oldDepartures = departures.get();
if (oldDepartures) {
verifyBucketNodes(expectedOwners, partitionName);
if (expectedOwners.isEmpty()) {
problematicDeparture = true; // need to pick new victims
// need to pick new victims
if (problematicDeparture) {
if (logger.isDebugEnabled()) {
logger.debug("Bucket observer found departed members - retrying");
// Look for success...
int oldArrivals = arrivals.get();
if (oldArrivals >= expectedCount) {
// success!
if (logger.isDebugEnabled()) {
logger.debug("Waiting for bucket {} to finish being created",
int creationWaitMillis = 5 * 1000;
if (oldArrivals == arrivals.get() && oldDepartures == departures.get()) {
"Time out waiting {} ms for creation of bucket for partitioned region {}. Members requested to create the bucket are: {}",
creationWaitMillis, partitionedRegion.getFullPath(), expectedOwners);
if (problematicDeparture) {
return new BucketMembershipObserverResults(true, null);
InternalDistributedMember primary = bucketToMonitor.getBucketAdvisor().getPrimary();
if (primary == null) {
* Handle a race where nobody has the bucket. We can't return a null member here because we
* haven't created the bucket, need to let the higher level code loop.
return new BucketMembershipObserverResults(true, null);
return new BucketMembershipObserverResults(false, primary);
* This class extends MembershipListener to perform cleanup when a node leaves DistributedSystem.
private class PRMembershipListener implements MembershipListener {
public void memberDeparted(DistributionManager distributionManager,
InternalDistributedMember id, boolean crashed) {
try {
DistributedMember member = partitionedRegion.getSystem().getDistributedMember();
if (logger.isDebugEnabled()) {
"MembershipListener invoked on DistributedMember = {} for failed memberId = {}",
if (!partitionedRegion.isCacheClosing() && !partitionedRegion.isDestroyed()
&& !member.equals(id)) {
Runnable postRecoveryTask = null;
// Only schedule redundancy recovery if this not a fixed PR.
if (!partitionedRegion.isFixedPartitionedRegion()) {
postRecoveryTask = () -> {
// After the metadata has been cleaned, recover redundancy.
// Schedule clean up the metadata for the failed member.
partitionedRegion.getRegionIdentifier(), id, postRecoveryTask);
} catch (CancelException ignore) {
// ignore
private static class ManageBucketRsp {
private static final ManageBucketRsp NO = new ManageBucketRsp("NO");
private static final ManageBucketRsp YES = new ManageBucketRsp("YES");
private static final ManageBucketRsp NO_INITIALIZING = new ManageBucketRsp("NO_INITIALIZING");
private static final ManageBucketRsp CLOSED = new ManageBucketRsp("CLOSED");
private final String name;
private ManageBucketRsp(String name) { = name;
private boolean isAcceptance() {
return this == YES;
public String toString() {
return "ManageBucketRsp(" + name + ")";
/** return YES if the argument is true, NO if not */
private static ManageBucketRsp valueOf(boolean managed) {
return managed ? YES : NO;
private static class BucketMembershipObserverResults {
private final boolean problematicDeparture;
final InternalDistributedMember primary;
BucketMembershipObserverResults(boolean re, InternalDistributedMember p) {
problematicDeparture = re;
primary = p;
public String toString() {
return "pDepart:" + problematicDeparture + " primary:" + primary;
private static class ArrayListWithClearState<T> extends ArrayList<T> {
private static final long serialVersionUID = 1L;
private volatile boolean wasCleared;
private boolean wasCleared() {
return wasCleared;
public void clear() {
wasCleared = true;