| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache; |
| |
| import static java.lang.String.format; |
| import static java.lang.System.lineSeparator; |
| import static java.util.Collections.emptyList; |
| import static java.util.concurrent.TimeUnit.MILLISECONDS; |
| import static org.apache.geode.internal.cache.ColocationHelper.checkMembersColocation; |
| import static org.apache.geode.internal.cache.PartitionedRegionHelper.printCollection; |
| import static org.apache.geode.util.internal.GeodeGlossary.GEMFIRE_PREFIX; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.BiFunction; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.CancelCriterion; |
| import org.apache.geode.CancelException; |
| import org.apache.geode.SystemFailure; |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.annotations.internal.MakeNotStatic; |
| import org.apache.geode.cache.DiskAccessException; |
| import org.apache.geode.cache.PartitionedRegionStorageException; |
| import org.apache.geode.cache.Region; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.persistence.PartitionOfflineException; |
| import org.apache.geode.distributed.DistributedMember; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.LonerDistributionManager; |
| import org.apache.geode.distributed.internal.MembershipListener; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.Assert; |
| import org.apache.geode.internal.NanoTimer; |
| import org.apache.geode.internal.OneTaskOnlyExecutor; |
| import org.apache.geode.internal.cache.PartitionedRegion.RetryTimeKeeper; |
| import org.apache.geode.internal.cache.PartitionedRegionDataStore.CreateBucketResult; |
| import org.apache.geode.internal.cache.control.InternalResourceManager; |
| import org.apache.geode.internal.cache.partitioned.Bucket; |
| import org.apache.geode.internal.cache.partitioned.BucketBackupMessage; |
| import org.apache.geode.internal.cache.partitioned.CreateBucketMessage; |
| import org.apache.geode.internal.cache.partitioned.CreateMissingBucketsTask; |
| import org.apache.geode.internal.cache.partitioned.DataStoreBuckets; |
| import org.apache.geode.internal.cache.partitioned.EndBucketCreationMessage; |
| import org.apache.geode.internal.cache.partitioned.FetchPartitionDetailsMessage; |
| import org.apache.geode.internal.cache.partitioned.FetchPartitionDetailsMessage.FetchPartitionDetailsResponse; |
| import org.apache.geode.internal.cache.partitioned.InternalPRInfo; |
| import org.apache.geode.internal.cache.partitioned.InternalPartitionDetails; |
| import org.apache.geode.internal.cache.partitioned.LoadProbe; |
| import org.apache.geode.internal.cache.partitioned.ManageBackupBucketMessage; |
| import org.apache.geode.internal.cache.partitioned.ManageBucketMessage; |
| import org.apache.geode.internal.cache.partitioned.ManageBucketMessage.NodeResponse; |
| import org.apache.geode.internal.cache.partitioned.OfflineMemberDetails; |
| import org.apache.geode.internal.cache.partitioned.OfflineMemberDetailsImpl; |
| import org.apache.geode.internal.cache.partitioned.PRLoad; |
| import org.apache.geode.internal.cache.partitioned.PartitionMemberInfoImpl; |
| import org.apache.geode.internal.cache.partitioned.PartitionRegionInfoImpl; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp; |
| import org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOpFactory; |
| import org.apache.geode.internal.cache.partitioned.PersistentBucketRecoverer; |
| import org.apache.geode.internal.cache.partitioned.RecoveryRunnable; |
| import org.apache.geode.internal.cache.partitioned.RegionAdvisor; |
| import org.apache.geode.internal.cache.partitioned.RegionAdvisor.PartitionProfile; |
| import org.apache.geode.internal.cache.partitioned.rebalance.CompositeDirector; |
| import org.apache.geode.internal.cache.partitioned.rebalance.FPRDirector; |
| import org.apache.geode.internal.cache.partitioned.rebalance.RebalanceDirector; |
| import org.apache.geode.internal.cache.persistence.MembershipFlushRequest; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberID; |
| import org.apache.geode.internal.monitoring.ThreadsMonitoring; |
| import org.apache.geode.logging.internal.executors.LoggingThread; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| /** |
| * This class provides the redundancy management for partitioned region. It will provide the |
| * following to the PartitionedRegion: |
| * |
| * <ol> |
| * <li>Redundancy management at the time of bucket creation. |
| * <li>Redundancy management at the new node arrival. |
| * <li>Redundancy management when the node leaves the partitioned region distributed system |
| * gracefully. i.e. Cache.close() |
| * <li>Redundancy management at random node failure. |
| * </ol> |
| */ |
| public class PRHARedundancyProvider { |
| private static final Logger logger = LogService.getLogger(); |
| |
| public static final String TIMEOUT_MSG = |
| "If your system has sufficient space, perhaps it is under membership or region creation stress?"; |
| |
| /** |
| * Signature string indicating that not enough stores are available. |
| */ |
| public static final String INSUFFICIENT_STORES_MSG = |
| "Advise you to start enough data store nodes"; |
| |
| private static final boolean DISABLE_CREATE_BUCKET_RANDOMNESS = |
| Boolean.getBoolean(GEMFIRE_PREFIX + "DISABLE_CREATE_BUCKET_RANDOMNESS"); |
| |
| private static final String DATASTORE_DISCOVERY_TIMEOUT_PROPERTY_NAME = |
| GEMFIRE_PREFIX + "partitionedRegionDatastoreDiscoveryTimeout"; |
| |
| /** |
| * Signature string indicating that there are enough stores available. |
| */ |
| private static final String SUFFICIENT_STORES_MSG = "Found a member to host a bucket."; |
| |
| /** |
| * string indicating the attempt to allocate a bucket |
| */ |
| private static final String ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET = |
| "allocate enough members to host a new bucket"; |
| |
| private static final long INSUFFICIENT_LOGGING_THROTTLE_TIME = TimeUnit.SECONDS.toNanos( |
| Integer.getInteger(GEMFIRE_PREFIX + "InsufficientLoggingThrottleTime", 2)); |
| |
| private static final ThreadLocal<Boolean> forceLocalPrimaries = new ThreadLocal<>(); |
| |
| @MakeNotStatic |
| private static final Long DATASTORE_DISCOVERY_TIMEOUT_MILLISECONDS = |
| Long.getLong(DATASTORE_DISCOVERY_TIMEOUT_PROPERTY_NAME); |
| |
| @MakeNotStatic |
| private static final AtomicLong insufficientLogTimeStamp = new AtomicLong(0); |
| |
| @MakeNotStatic |
| private final AtomicBoolean firstInsufficientStoresLogged = new AtomicBoolean(false); |
| |
| private final PartitionedRegion partitionedRegion; |
| private final InternalResourceManager resourceManager; |
| private final PartitionedRegionRebalanceOpFactory rebalanceOpFactory; |
| private final CompletableFuture<Void> providerStartupTask; |
| |
| /** |
| * An executor to submit tasks for redundancy recovery too. It makes sure that there will only be |
| * one redundancy recovery task in the queue at a time. |
| */ |
| private final OneTaskOnlyExecutor recoveryExecutor; |
| |
| private final Object shutdownLock = new Object(); |
| |
| private final BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction; |
| |
| private volatile ScheduledFuture<?> recoveryFuture; |
| |
| /** |
| * Used to consolidate logging for bucket regions waiting on other members to come online. |
| */ |
| private volatile PersistentBucketRecoverer persistentBucketRecoverer; |
| |
| private boolean shutdown; |
| |
| /** |
| * Constructor for PRHARedundancyProvider. |
| * |
| * @param partitionedRegion The PartitionedRegion for which the HA redundancy is required to be |
| * managed. |
| */ |
| public PRHARedundancyProvider(PartitionedRegion partitionedRegion, |
| InternalResourceManager resourceManager) { |
| this(partitionedRegion, resourceManager, PersistentBucketRecoverer::new, |
| PartitionedRegionRebalanceOp::new, |
| new CompletableFuture<>()); |
| } |
| |
| @VisibleForTesting |
| PRHARedundancyProvider(PartitionedRegion partitionedRegion, |
| InternalResourceManager resourceManager, |
| BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction) { |
| this(partitionedRegion, resourceManager, persistentBucketRecovererFunction, |
| PartitionedRegionRebalanceOp::new, new CompletableFuture<>()); |
| } |
| |
| @VisibleForTesting |
| PRHARedundancyProvider(PartitionedRegion partitionedRegion, |
| InternalResourceManager resourceManager, |
| BiFunction<PRHARedundancyProvider, Integer, PersistentBucketRecoverer> persistentBucketRecovererFunction, |
| PartitionedRegionRebalanceOpFactory rebalanceOpFactory, |
| CompletableFuture<Void> providerStartupTask) { |
| this.partitionedRegion = partitionedRegion; |
| this.resourceManager = resourceManager; |
| this.rebalanceOpFactory = rebalanceOpFactory; |
| this.providerStartupTask = providerStartupTask; |
| recoveryExecutor = new OneTaskOnlyExecutor(resourceManager.getExecutor(), |
| () -> InternalResourceManager.getResourceObserver().recoveryConflated(partitionedRegion), |
| getThreadMonitorObj()); |
| this.persistentBucketRecovererFunction = persistentBucketRecovererFunction; |
| } |
| |
| /** |
| * Display bucket allocation status |
| * |
| * @param partitionedRegion the given region |
| * @param allStores the list of available stores. If null, unknown. |
| * @param alreadyUsed stores allocated; only used if allStores != null |
| * @param forLog true if the generated string is for a log message |
| * @return the description string |
| */ |
| private static String regionStatus(PartitionedRegion partitionedRegion, |
| Collection<InternalDistributedMember> allStores, |
| Collection<InternalDistributedMember> alreadyUsed, boolean forLog) { |
| String newLine = forLog ? " " : lineSeparator(); |
| String spaces = forLog ? "" : " "; |
| |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Partitioned Region name = "); |
| sb.append(partitionedRegion.getFullPath()); |
| |
| if (allStores != null) { |
| sb.append(newLine).append(spaces); |
| sb.append("Redundancy level set to "); |
| sb.append(partitionedRegion.getRedundantCopies()); |
| sb.append(newLine); |
| sb.append(". Number of available data stores: "); |
| sb.append(allStores.size()); |
| sb.append(newLine).append(spaces); |
| sb.append(". Number successfully allocated = "); |
| sb.append(alreadyUsed.size()); |
| sb.append(newLine); |
| sb.append(". Data stores: "); |
| sb.append(printCollection(allStores)); |
| sb.append(newLine); |
| sb.append(". Data stores successfully allocated: "); |
| sb.append(printCollection(alreadyUsed)); |
| sb.append(newLine); |
| sb.append(". Equivalent members: "); |
| sb.append(printCollection(partitionedRegion.getDistributionManager().getMembersInThisZone())); |
| } |
| |
| return sb.toString(); |
| } |
| |
| /** |
| * Indicate a timeout due to excessive retries among available peers |
| * |
| * @param allStores all feasible stores. If null, we don't know. |
| * @param alreadyUsed those that have already accepted, only used if allStores != null |
| * @param opString description of the operation which timed out |
| */ |
| public static void timedOut(PartitionedRegion partitionedRegion, |
| Set<InternalDistributedMember> allStores, |
| Collection<InternalDistributedMember> alreadyUsed, String opString, long timeOut) { |
| throw new PartitionedRegionStorageException( |
| format("Timed out attempting to %s in the partitioned region.%sWaited for: %s ms.", |
| opString, regionStatus(partitionedRegion, allStores, alreadyUsed, true), timeOut) |
| + TIMEOUT_MSG); |
| } |
| |
| public PartitionedRegion getPartitionedRegion() { |
| return partitionedRegion; |
| } |
| |
| private Set<InternalDistributedMember> getAllStores(String partitionName) { |
| if (partitionName != null) { |
| return getFixedPartitionStores(partitionName); |
| } |
| final Set<InternalDistributedMember> allStores = |
| partitionedRegion.getRegionAdvisor().adviseDataStore(true); |
| PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore(); |
| if (localDataStore != null) { |
| allStores.add(partitionedRegion.getDistributionManager().getId()); |
| } |
| return allStores; |
| } |
| |
| /** |
| * This is for FPR, for given partition, we have to return the set of datastores on which the |
| * given partition is defined |
| * |
| * @param partitionName name of the partition for which datastores need to be found out |
| */ |
| private Set<InternalDistributedMember> getFixedPartitionStores(String partitionName) { |
| Set<InternalDistributedMember> members = |
| partitionedRegion.getRegionAdvisor().adviseFixedPartitionDataStores(partitionName); |
| |
| List<FixedPartitionAttributesImpl> allFixedPartitionAttributes = |
| partitionedRegion.getFixedPartitionAttributesImpl(); |
| |
| if (allFixedPartitionAttributes != null) { |
| for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) { |
| if (fixedPartitionAttributes.getPartitionName().equals(partitionName)) { |
| members.add(partitionedRegion.getMyId()); |
| } |
| } |
| } |
| return members; |
| } |
| |
| /** |
| * Indicate that we are unable to allocate sufficient stores and the timeout period has passed |
| * |
| * @param allStores stores we know about |
| * @param alreadyUsed ones already committed |
| * @param onlyLog true if only a warning log messages should be generated. |
| */ |
| private void insufficientStores(Set<InternalDistributedMember> allStores, |
| Collection<InternalDistributedMember> alreadyUsed, boolean onlyLog) { |
| String regionStat = regionStatus(partitionedRegion, allStores, alreadyUsed, onlyLog); |
| String newLine = onlyLog ? " " : lineSeparator(); |
| String notEnoughValidNodes = alreadyUsed.isEmpty() |
| ? "Unable to find any members to host a bucket in the partitioned region. %s.%s" |
| : "Configured redundancy level could not be satisfied. %s to satisfy redundancy for the region.%s"; |
| if (onlyLog) { |
| logger.warn(format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG, |
| newLine + regionStat + newLine)); |
| } else { |
| throw new PartitionedRegionStorageException( |
| format(notEnoughValidNodes, INSUFFICIENT_STORES_MSG, newLine + regionStat + newLine)); |
| } |
| } |
| |
| /** |
| * Create a single copy of this bucket on one node. The bucket must already be locked. |
| * |
| * @param bucketId The bucket we are working on |
| * @param newBucketSize size to create it |
| * @param alreadyUsed members who already seem to have the bucket |
| * @param timeOut point at which to fail |
| * @param allStores the set of data stores to choose from |
| * @return the new member, null if it fails. |
| * @throws PartitionedRegionStorageException if there are not enough data stores |
| */ |
| private InternalDistributedMember createBucketInstance(int bucketId, int newBucketSize, |
| Collection<InternalDistributedMember> excludedMembers, |
| Collection<InternalDistributedMember> alreadyUsed, |
| ArrayListWithClearState<InternalDistributedMember> failedMembers, long timeOut, |
| Set<InternalDistributedMember> allStores) { |
| |
| boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| // Recalculate list of candidates |
| Set<InternalDistributedMember> candidateMembers = new HashSet<>(allStores); |
| candidateMembers.removeAll(alreadyUsed); |
| candidateMembers.removeAll(excludedMembers); |
| candidateMembers.removeAll(failedMembers); |
| |
| if (isDebugEnabled) { |
| logger.debug("AllStores={} AlreadyUsed={} excluded={} failed={}", allStores, alreadyUsed, |
| excludedMembers, failedMembers); |
| } |
| |
| if (candidateMembers.isEmpty()) { |
| partitionedRegion.checkReadiness(); |
| |
| // Run out of candidates. Refetch? |
| if (System.currentTimeMillis() > timeOut) { |
| if (isDebugEnabled) { |
| logger.debug("createBucketInstance: ran out of candidates and timed out"); |
| } |
| // fail, let caller signal error |
| return null; |
| } |
| |
| // Recalculate |
| candidateMembers = new HashSet<>(allStores); |
| candidateMembers.removeAll(alreadyUsed); |
| candidateMembers.removeAll(excludedMembers); |
| failedMembers.clear(); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("createBucketInstance: candidateMembers = {}", candidateMembers); |
| } |
| |
| // If there are no candidates, early out. |
| if (candidateMembers.isEmpty()) { |
| // no options |
| if (isDebugEnabled) { |
| logger.debug("createBucketInstance: no valid candidates"); |
| } |
| // failure |
| return null; |
| } |
| |
| // In case of FixedPartitionedRegion, candidateMembers is the set of members on which |
| // required fixed partition is defined. |
| InternalDistributedMember candidate; |
| if (partitionedRegion.isFixedPartitionedRegion()) { |
| candidate = candidateMembers.iterator().next(); |
| } else { |
| String colocatedWith = |
| partitionedRegion.getAttributes().getPartitionAttributes().getColocatedWith(); |
| if (colocatedWith != null) { |
| candidate = getColocatedDataStore(candidateMembers, alreadyUsed, bucketId, colocatedWith); |
| } else { |
| Collection<InternalDistributedMember> orderedCandidates = new ArrayList<>(candidateMembers); |
| candidate = getPreferredDataStore(orderedCandidates, alreadyUsed); |
| } |
| } |
| |
| if (candidate == null) { |
| failedMembers.addAll(candidateMembers); |
| return null; |
| } |
| |
| if (!partitionedRegion.isShadowPR() && !checkMembersColocation(partitionedRegion, candidate)) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "createBucketInstances - Member does not have all of the regions colocated with partitionedRegion {}", |
| candidate); |
| } |
| failedMembers.add(candidate); |
| return null; |
| } |
| |
| if (!candidate.equals(partitionedRegion.getMyId())) { |
| PartitionProfile profile = |
| partitionedRegion.getRegionAdvisor().getPartitionProfile(candidate); |
| if (profile == null) { |
| if (isDebugEnabled) { |
| logger.debug("createBucketInstance: {}: no partition profile for {}", |
| partitionedRegion.getFullPath(), candidate); |
| } |
| failedMembers.add(candidate); |
| return null; |
| } |
| } |
| |
| // Coordinate with any remote close occurring, causing it to wait until |
| // this create bucket attempt has been made. |
| ManageBucketRsp response = |
| createBucketOnMember(bucketId, candidate, newBucketSize, failedMembers.wasCleared()); |
| |
| // Add targetNode to bucketNodes if successful, else to failedNodeList |
| if (response.isAcceptance()) { |
| // success! |
| return candidate; |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("createBucketInstance: {}: candidate {} declined to manage bucketId={}: {}", |
| partitionedRegion.getFullPath(), candidate, |
| partitionedRegion.bucketStringForLogs(bucketId), |
| response); |
| } |
| if (response.equals(ManageBucketRsp.CLOSED)) { |
| excludedMembers.add(candidate); |
| } else { |
| failedMembers.add(candidate); |
| } |
| |
| return null; |
| } |
| |
| InternalDistributedMember createBucketOnDataStore(int bucketId, int size, |
| RetryTimeKeeper retryTimeKeeper) { |
| boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| InternalDistributedMember primaryForFixedPartition = null; |
| if (partitionedRegion.isFixedPartitionedRegion()) { |
| primaryForFixedPartition = |
| partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId); |
| } |
| |
| InternalDistributedMember memberHostingBucket; |
| Collection<InternalDistributedMember> attempted = new HashSet<>(); |
| do { |
| partitionedRegion.checkReadiness(); |
| Set<InternalDistributedMember> available = |
| partitionedRegion.getRegionAdvisor().adviseInitializedDataStore(); |
| available.removeAll(attempted); |
| InternalDistributedMember targetMember = null; |
| for (InternalDistributedMember member : available) { |
| if (available.contains(primaryForFixedPartition)) { |
| targetMember = primaryForFixedPartition; |
| } else { |
| targetMember = member; |
| } |
| break; |
| } |
| if (targetMember == null) { |
| if (shouldLogInsufficientStores()) { |
| insufficientStores(available, Collections.emptySet(), true); |
| } |
| // this will always throw an exception |
| insufficientStores(available, Collections.emptySet(), false); |
| } |
| try { |
| if (isDebugEnabled) { |
| logger.debug("Attempting to get data store {} to create the bucket {} for us", |
| targetMember, |
| partitionedRegion.bucketStringForLogs(bucketId)); |
| } |
| CreateBucketMessage.NodeResponse response = |
| CreateBucketMessage.send(targetMember, partitionedRegion, bucketId, size); |
| memberHostingBucket = response.waitForResponse(); |
| if (memberHostingBucket != null) { |
| return memberHostingBucket; |
| } |
| } catch (ForceReattemptException e) { |
| // do nothing, we will already check again for a primary. |
| } |
| attempted.add(targetMember); |
| } while ((memberHostingBucket = |
| partitionedRegion.getNodeForBucketWrite(bucketId, retryTimeKeeper)) == null); |
| return memberHostingBucket; |
| } |
| |
| /** |
| * Creates bucket atomically by creating all the copies to satisfy redundancy. In case all copies |
| * can not be created, a PartitionedRegionStorageException is thrown to the user and |
| * BucketBackupMessage is sent to the nodes to make copies of a bucket that was only partially |
| * created. Other VMs are informed of bucket creation through updates through their |
| * {@link BucketAdvisor.BucketProfile}s. |
| * |
| * <p> |
| * This method is synchronized to enforce a single threaded ordering, allowing for a more accurate |
| * picture of bucket distribution in the face of concurrency. |
| * |
| * <p> |
| * This method is now slightly misnamed. Another member could be in the process of creating this |
| * same bucket at the same time. |
| * |
| * @param bucketId Id of the bucket to be created. |
| * @param newBucketSize size of the first entry. |
| * @return the primary member for the newly created bucket |
| * @throws PartitionedRegionStorageException if required # of buckets can not be created to |
| * satisfy redundancy. |
| * @throws PartitionedRegionException if d-lock can not be acquired to create bucket. |
| * @throws PartitionOfflineException if persistent data recovery is not complete for a partitioned |
| * region referred to in the query. |
| */ |
| public InternalDistributedMember createBucketAtomically(int bucketId, int newBucketSize, |
| boolean finishIncompleteCreation, String partitionName) |
| throws PartitionedRegionStorageException, PartitionedRegionException, |
| PartitionOfflineException { |
| boolean isDebugEnabled = logger.isDebugEnabled(); |
| |
| partitionedRegion.checkPROffline(); |
| |
| // If there are insufficient stores throw *before* we try acquiring the |
| // (very expensive) bucket lock or the (somewhat expensive) monitor on this |
| earlySufficientStoresCheck(partitionName); |
| |
| synchronized (this) { |
| if (partitionedRegion.getCache().isCacheAtShutdownAll()) { |
| throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down"); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("Starting atomic creation of bucketId={}", |
| partitionedRegion.bucketStringForLogs(bucketId)); |
| } |
| |
| long timeOut = System.currentTimeMillis() + computeTimeout(); |
| BucketMembershipObserver observer = null; |
| boolean needToElectPrimary = true; |
| InternalDistributedMember bucketPrimary = null; |
| |
| try { |
| partitionedRegion.checkReadiness(); |
| |
| Bucket toCreate = partitionedRegion.getRegionAdvisor().getBucket(bucketId); |
| |
| if (!finishIncompleteCreation) { |
| bucketPrimary = partitionedRegion.getBucketPrimary(bucketId); |
| if (bucketPrimary != null) { |
| if (isDebugEnabled) { |
| logger.debug( |
| "during atomic creation, discovered that the primary already exists {} returning early", |
| bucketPrimary); |
| } |
| needToElectPrimary = false; |
| return bucketPrimary; |
| } |
| } |
| |
| observer = new BucketMembershipObserver(toCreate).beginMonitoring(); |
| ArrayListWithClearState<InternalDistributedMember> failedMembers = |
| new ArrayListWithClearState<>(); |
| Set<InternalDistributedMember> excludedMembers = new HashSet<>(); |
| Collection<InternalDistributedMember> acceptedMembers = new ArrayList<>(); |
| |
| for (boolean loggedInsufficientStores = false;;) { |
| partitionedRegion.checkReadiness(); |
| if (partitionedRegion.getCache().isCacheAtShutdownAll()) { |
| if (isDebugEnabled) { |
| logger.debug("Aborted createBucketAtomically due to ShutdownAll"); |
| } |
| throw partitionedRegion.getCache().getCacheClosedException("Cache is shutting down"); |
| } |
| |
| long timeLeft = timeOut - System.currentTimeMillis(); |
| if (timeLeft < 0) { |
| // It took too long. |
| timedOut(partitionedRegion, getAllStores(partitionName), acceptedMembers, |
| ALLOCATE_ENOUGH_MEMBERS_TO_HOST_BUCKET, computeTimeout()); |
| } |
| |
| if (isDebugEnabled) { |
| logger.debug("createBucketAtomically: have {} ms left to finish this", timeLeft); |
| } |
| |
| // Always go back to the advisor, see if any fresh data stores are present. |
| Set<InternalDistributedMember> allStores = getAllStores(partitionName); |
| |
| loggedInsufficientStores = checkSufficientStores(allStores, loggedInsufficientStores); |
| |
| InternalDistributedMember candidate = createBucketInstance(bucketId, newBucketSize, |
| excludedMembers, acceptedMembers, failedMembers, timeOut, allStores); |
| if (candidate != null) { |
| if (partitionedRegion.getDistributionManager().enforceUniqueZone()) { |
| // enforceUniqueZone property has no effect for a loner |
| if (!(partitionedRegion |
| .getDistributionManager() instanceof LonerDistributionManager)) { |
| Set<InternalDistributedMember> exm = getBuddyMembersInZone(candidate, allStores); |
| exm.remove(candidate); |
| exm.removeAll(acceptedMembers); |
| excludedMembers.addAll(exm); |
| } else { |
| // log a warning if Loner |
| logger.warn( |
| "enforce-unique-host and redundancy-zone properties have no effect for a LonerDistributedSystem."); |
| } |
| } |
| } |
| |
| // Get an updated list of bucket owners, which should include |
| // buckets created concurrently with this createBucketAtomically call |
| acceptedMembers = partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId); |
| |
| if (isDebugEnabled) { |
| logger.debug("Accepted members: {}", acceptedMembers); |
| } |
| |
| // set the primary as the candidate in the first iteration if the candidate has accepted |
| if (bucketPrimary == null && acceptedMembers.contains(candidate)) { |
| bucketPrimary = candidate; |
| } |
| |
| // prune out the stores that have left |
| verifyBucketNodes(excludedMembers, partitionName); |
| |
| // Note - we used to wait for the created bucket to become primary here |
| // if this is a colocated region. We no longer need to do that, because |
| // the EndBucketMessage is sent out after bucket creation completes to |
| // select the primary. |
| |
| // Have we exhausted all candidates? |
| int potentialCandidateCount = allStores.size() |
| - (excludedMembers.size() + acceptedMembers.size() + failedMembers.size()); |
| |
| // Determining exhausted members competes with bucket balancing; it's |
| // important to re-visit all failed members since "failed" set may |
| // contain datastores which at the moment are imbalanced, but yet could |
| // be candidates. If the failed members list is empty, its expected |
| // that the next iteration clears the (already empty) list. |
| boolean exhaustedPotentialCandidates = |
| failedMembers.wasCleared() && potentialCandidateCount <= 0; |
| boolean redundancySatisfied = |
| acceptedMembers.size() > partitionedRegion.getRedundantCopies(); |
| boolean bucketNotCreated = acceptedMembers.isEmpty(); |
| |
| if (isDebugEnabled) { |
| logger.debug( |
| "potentialCandidateCount={}, exhaustedPotentialCandidates={}, redundancySatisfied={}, bucketNotCreated={}", |
| potentialCandidateCount, exhaustedPotentialCandidates, redundancySatisfied, |
| bucketNotCreated); |
| } |
| |
| if (bucketNotCreated) { |
| // if we haven't managed to create the bucket on any nodes, retry. |
| continue; |
| } |
| |
| if (exhaustedPotentialCandidates && !redundancySatisfied) { |
| insufficientStores(allStores, acceptedMembers, true); |
| } |
| |
| // Allow the thread to potentially finish bucket creation even if redundancy was not met. |
| if (redundancySatisfied || exhaustedPotentialCandidates) { |
| // Tell one of the members to become primary. |
| // The rest of the members will be allowed to volunteer for primary. |
| endBucketCreation(bucketId, acceptedMembers, bucketPrimary, partitionName); |
| |
| int expectedRemoteHosts = acceptedMembers.size() |
| - (acceptedMembers.contains(partitionedRegion.getMyId()) ? 1 : 0); |
| |
| boolean interrupted = Thread.interrupted(); |
| try { |
| BucketMembershipObserverResults results = observer |
| .waitForOwnersGetPrimary(expectedRemoteHosts, acceptedMembers, partitionName); |
| if (results.problematicDeparture) { |
| // BZZZT! Member left. Start over. |
| continue; |
| } |
| bucketPrimary = results.primary; |
| } catch (InterruptedException e) { |
| interrupted = true; |
| partitionedRegion.getCancelCriterion().checkCancelInProgress(e); |
| } finally { |
| if (interrupted) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| needToElectPrimary = false; |
| |
| return bucketPrimary; |
| } |
| } |
| } catch (DiskAccessException dae) { |
| CancelCriterion cancelCriterion = partitionedRegion.getCancelCriterion(); |
| if (cancelCriterion.isCancelInProgress()) { |
| needToElectPrimary = false; |
| cancelCriterion.checkCancelInProgress(dae); |
| } |
| |
| throw dae; |
| } catch (CancelException | RegionDestroyedException e) { |
| // We don't need to elect a primary if the cache was closed. The other members will |
| // take care of it. This ensures we don't compromise redundancy. |
| needToElectPrimary = false; |
| throw e; |
| } catch (PartitionOfflineException e) { |
| throw e; |
| } catch (RuntimeException e) { |
| if (isDebugEnabled) { |
| logger.debug("Unable to create new bucket {}: {}", bucketId, e.getMessage(), e); |
| } |
| |
| if (!finishIncompleteCreation) { |
| cleanUpBucket(bucketId); |
| } |
| throw e; |
| } finally { |
| if (observer != null) { |
| observer.stopMonitoring(); |
| } |
| |
| // Try to make sure everyone that created the bucket can volunteer for primary |
| if (needToElectPrimary) { |
| try { |
| endBucketCreation(bucketId, |
| partitionedRegion.getRegionAdvisor().getBucketOwners(bucketId), |
| bucketPrimary, partitionName); |
| } catch (Exception e) { |
| // if region is going down, then no warning level logs |
| if (e instanceof CancelException |
| || partitionedRegion.getCancelCriterion().isCancelInProgress()) { |
| logger.debug("Exception trying choose a primary after bucket creation failure", e); |
| } else { |
| logger.warn("Exception trying choose a primary after bucket creation failure", e); |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| /** |
| * Figure out which member should be primary for a bucket among the members that have created the |
| * bucket, and tell that member to become the primary. |
| * |
| * @param acceptedMembers The members that now host the bucket |
| */ |
| private void endBucketCreation(int bucketId, |
| Collection<InternalDistributedMember> acceptedMembers, |
| InternalDistributedMember targetPrimary, String partitionName) { |
| if (acceptedMembers.isEmpty()) { |
| return; |
| } |
| acceptedMembers = new HashSet<>(acceptedMembers); |
| |
| // This is for FPR, for a given bucket id , make sure that for given bucket |
| // id , only the datastore on which primary partition is defined for this |
| // bucket becomes the primary. If primary partition is not available then |
| // secondary partition will become primary |
| if (partitionName != null) { |
| if (isLocalPrimary(partitionName)) { |
| targetPrimary = partitionedRegion.getMyId(); |
| } else { |
| targetPrimary = |
| partitionedRegion.getRegionAdvisor().adviseFixedPrimaryPartitionDataStore(bucketId); |
| if (targetPrimary == null) { |
| Set<InternalDistributedMember> fpDataStores = getFixedPartitionStores(partitionName); |
| targetPrimary = fpDataStores.iterator().next(); |
| } |
| } |
| } |
| |
| if (targetPrimary == null) { |
| // we need to select the same primary as chosen earlier (e.g. |
| // the parent's in case of colocation) so it is now passed |
| targetPrimary = |
| getPreferredDataStore(acceptedMembers, Collections.emptySet()); |
| } |
| |
| boolean isHosting = acceptedMembers.remove(partitionedRegion.getDistributionManager().getId()); |
| |
| EndBucketCreationMessage.send(acceptedMembers, targetPrimary, partitionedRegion, bucketId); |
| |
| if (isHosting) { |
| endBucketCreationLocally(bucketId, targetPrimary); |
| } |
| } |
| |
| private boolean isLocalPrimary(String partitionName) { |
| List<FixedPartitionAttributesImpl> allFixedPartitionAttributes = |
| partitionedRegion.getFixedPartitionAttributesImpl(); |
| if (allFixedPartitionAttributes != null) { |
| for (FixedPartitionAttributesImpl fixedPartitionAttributes : allFixedPartitionAttributes) { |
| if (fixedPartitionAttributes.getPartitionName().equals(partitionName) |
| && fixedPartitionAttributes.isPrimary()) { |
| return true; |
| } |
| } |
| } |
| return false; |
| } |
| |
| public void endBucketCreationLocally(int bucketId, InternalDistributedMember newPrimary) { |
| // Don't elect ourselves as primary or tell others to persist our ID |
| // if this member has been destroyed. |
| if (partitionedRegion.getCancelCriterion().isCancelInProgress() |
| || partitionedRegion.isDestroyed()) { |
| return; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("endBucketCreationLocally: for region {} bucketId={} new primary: {}", |
| partitionedRegion.getFullPath(), bucketId, newPrimary); |
| } |
| |
| BucketAdvisor bucketAdvisor = partitionedRegion.getRegionAdvisor().getBucketAdvisor(bucketId); |
| ProxyBucketRegion proxyBucketRegion = bucketAdvisor.getProxyBucketRegion(); |
| BucketPersistenceAdvisor persistentAdvisor = proxyBucketRegion.getPersistenceAdvisor(); |
| |
| // prevent multiple threads from ending bucket creation at the same time. |
| synchronized (proxyBucketRegion) { |
| if (persistentAdvisor != null) { |
| BucketRegion realBucket = proxyBucketRegion.getCreatedBucketRegion(); |
| if (realBucket != null) { |
| PersistentMemberID persistentId = realBucket.getPersistentID(); |
| persistentAdvisor.endBucketCreation(persistentId); |
| } |
| } |
| |
| // We've received an endBucketCreationMessage, but the primary |
| // may not have. So now we wait for the chosen member to become primary. |
| bucketAdvisor.setPrimaryElector(newPrimary); |
| |
| if (partitionedRegion.getGemFireCache().getMyId().equals(newPrimary)) { |
| // If we're the chosen primary, volunteer for primary now |
| if (bucketAdvisor.isHosting()) { |
| bucketAdvisor.clearPrimaryElector(); |
| bucketAdvisor.volunteerForPrimary(); |
| } |
| } else { |
| // It's possible the chosen primary has already left. In |
| // that case, volunteer for primary now. |
| if (!bucketAdvisor.adviseInitialized().contains(newPrimary)) { |
| bucketAdvisor.clearPrimaryElector(); |
| bucketAdvisor.volunteerForPrimary(); |
| } |
| |
| // If the bucket has had a primary, that means the chosen bucket was primary for a while. |
| // Go ahead and clear the primary elector field. |
| if (bucketAdvisor.getHadPrimary()) { |
| bucketAdvisor.clearPrimaryElector(); |
| bucketAdvisor.volunteerForPrimary(); |
| } |
| } |
| } |
| |
| // send out a profile update to indicate the persistence is initialized, if needed. |
| if (persistentAdvisor != null) { |
| bucketAdvisor.endBucketCreation(); |
| } |
| |
| List<PartitionedRegion> colocatedWithList = ColocationHelper.getColocatedChildRegions( |
| partitionedRegion); |
| for (PartitionedRegion child : colocatedWithList) { |
| if (child.getRegionAdvisor().isBucketLocal(bucketId)) { |
| child.getRedundancyProvider().endBucketCreationLocally(bucketId, newPrimary); |
| } |
| } |
| } |
| |
| /** |
| * Get buddy data stores on the same Host as the accepted member |
| * |
| * @return set of members on the same host, not including accepted member |
| * @since GemFire 5.9 |
| */ |
| private Set<InternalDistributedMember> getBuddyMembersInZone( |
| InternalDistributedMember acceptedMember, Collection<InternalDistributedMember> allStores) { |
| DistributionManager dm = partitionedRegion.getDistributionManager(); |
| Set<InternalDistributedMember> buddies = dm.getMembersInSameZone(acceptedMember); |
| buddies.retainAll(allStores); |
| return buddies; |
| } |
| |
| /** |
| * Early check for resources. This code may be executed for every put operation if there are no |
| * datastores present, limit excessive logging. |
| * |
| * @since GemFire 5.8 |
| */ |
| private void earlySufficientStoresCheck(String partitionName) { |
| assert Assert.assertHoldsLock(this, false); |
| Set<InternalDistributedMember> currentStores = getAllStores(partitionName); |
| if (currentStores.isEmpty()) { |
| if (shouldLogInsufficientStores()) { |
| insufficientStores(currentStores, emptyList(), true); |
| } |
| insufficientStores(currentStores, emptyList(), false); |
| } |
| } |
| |
| /** |
| * Limit the frequency for logging the {@link #INSUFFICIENT_STORES_MSG} message to once per PR |
| * after which once every {@link #INSUFFICIENT_LOGGING_THROTTLE_TIME} second |
| * |
| * @return true if it's time to log |
| * @since GemFire 5.8 |
| */ |
| private boolean shouldLogInsufficientStores() { |
| long now = NanoTimer.getTime(); |
| long delta = now - insufficientLogTimeStamp.get(); |
| if (firstInsufficientStoresLogged.compareAndSet(false, true) |
| || delta >= INSUFFICIENT_LOGGING_THROTTLE_TIME) { |
| insufficientLogTimeStamp.set(now); |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Compute timeout for waiting for a bucket. Prefer |
| * {@link #DATASTORE_DISCOVERY_TIMEOUT_MILLISECONDS} over |
| * {@link PartitionedRegion#getRetryTimeout()} |
| * |
| * @return the milliseconds to wait for a bucket creation operation |
| */ |
| private long computeTimeout() { |
| if (DATASTORE_DISCOVERY_TIMEOUT_MILLISECONDS != null) { |
| long millis = DATASTORE_DISCOVERY_TIMEOUT_MILLISECONDS; |
| // only positive values allowed |
| if (millis > 0) { |
| return millis; |
| } |
| } |
| return partitionedRegion.getRetryTimeout(); |
| } |
| |
| /** |
| * Check to determine that there are enough datastore VMs to start the bucket creation processes. |
| * Log a warning or throw an exception indicating when there are not enough datastore VMs. |
| * |
| * @param allStores All known data store instances (including local) |
| * @param loggedInsufficientStores indicates whether a warning has been logged |
| * @return true when a warning has been logged, false if a warning should be logged. |
| */ |
| private boolean checkSufficientStores(Set<InternalDistributedMember> allStores, |
| boolean loggedInsufficientStores) { |
| // Report (only once) if insufficient data store have been detected. |
| if (!loggedInsufficientStores) { |
| if (allStores.isEmpty()) { |
| insufficientStores(allStores, emptyList(), true); |
| return true; |
| } |
| } else { |
| if (!allStores.isEmpty()) { |
| // Excellent, sufficient resources were found! |
| logger.info("{} Region name, {}", SUFFICIENT_STORES_MSG, partitionedRegion.getFullPath()); |
| return false; |
| } |
| // Already logged warning, there are no datastores |
| insufficientStores(allStores, emptyList(), false); |
| } |
| return loggedInsufficientStores; |
| } |
| |
| /** |
| * Clean up locally created bucket and tell other VMs to attempt recovering redundancy |
| * |
| * @param bucketId the bucket identifier |
| */ |
| private void cleanUpBucket(int bucketId) { |
| Set<InternalDistributedMember> dataStores = |
| partitionedRegion.getRegionAdvisor().adviseDataStore(); |
| BucketBackupMessage.send(dataStores, partitionedRegion, bucketId); |
| } |
| |
| public void finishIncompleteBucketCreation(int bucketId) { |
| String partitionName = null; |
| if (partitionedRegion.isFixedPartitionedRegion()) { |
| FixedPartitionAttributesImpl fpa = |
| PartitionedRegionHelper.getFixedPartitionAttributesForBucket(partitionedRegion, bucketId); |
| partitionName = fpa.getPartitionName(); |
| } |
| createBucketAtomically(bucketId, 0, true, partitionName); |
| } |
| |
| /** |
| * Creates bucket with ID bucketId on targetNode. This method will also create the bucket for all |
| * of the child colocated PRs. |
| * |
| * @param isRebalance true if bucket creation is directed by rebalancing |
| * @return true if the bucket was sucessfully created |
| */ |
| public boolean createBackupBucketOnMember(int bucketId, InternalDistributedMember targetMember, |
| boolean isRebalance, boolean replaceOfflineData, InternalDistributedMember fromMember, |
| boolean forceCreation) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("createBackupBucketOnMember for bucketId={} member: {}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember); |
| } |
| |
| if (!targetMember.equals(partitionedRegion.getMyId())) { |
| PartitionProfile profile = |
| partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember); |
| if (profile == null) { |
| return false; |
| } |
| |
| try { |
| ManageBackupBucketMessage.NodeResponse response = |
| ManageBackupBucketMessage.send(targetMember, partitionedRegion, bucketId, isRebalance, |
| replaceOfflineData, fromMember, forceCreation); |
| |
| if (response.waitForAcceptance()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBackupBucketOnMember: Bucket creation succeed for bucketId={} on member = {}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember); |
| } |
| |
| return true; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBackupBucketOnMember: Bucket creation failed for bucketId={} on member = {}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember); |
| } |
| |
| return false; |
| |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| if (e instanceof ForceReattemptException) { |
| // no log needed |
| } else if (e instanceof CancelException |
| || e.getCause() != null && e.getCause() instanceof CancelException) { |
| // no need to log exceptions caused by cache closure |
| } else { |
| logger.warn("Exception creating partition on {}", targetMember, e); |
| } |
| return false; |
| } |
| } |
| |
| PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore(); |
| boolean bucketManaged = |
| dataStore != null && dataStore.grabBucket(bucketId, fromMember, forceCreation, |
| replaceOfflineData, isRebalance, null, false).equals(CreateBucketResult.CREATED); |
| if (!bucketManaged) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBackupBucketOnMember: Local data store refused to accommodate the data for bucketId={} dataStore={}", |
| partitionedRegion.bucketStringForLogs(bucketId), dataStore); |
| } |
| } |
| return bucketManaged; |
| } |
| |
| private boolean getForceLocalPrimaries() { |
| boolean result = false; |
| Boolean forceLocalPrimariesValue = forceLocalPrimaries.get(); |
| if (forceLocalPrimariesValue != null) { |
| result = forceLocalPrimariesValue; |
| } |
| return result; |
| } |
| |
| /** |
| * Creates bucket with ID bucketId on targetNode. |
| * |
| * @param forceCreation inform the targetMember it must attempt host the bucket, appropriately |
| * ignoring it's maximums |
| * @return a response object |
| */ |
| private ManageBucketRsp createBucketOnMember(int bucketId, InternalDistributedMember targetMember, |
| int newBucketSize, boolean forceCreation) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("createBucketOnMember for bucketId={} member: {}{}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember, |
| forceCreation ? " forced" : ""); |
| } |
| |
| if (!targetMember.equals(partitionedRegion.getMyId())) { |
| PartitionProfile profile = |
| partitionedRegion.getRegionAdvisor().getPartitionProfile(targetMember); |
| if (profile == null) { |
| return ManageBucketRsp.NO; |
| } |
| |
| try { |
| NodeResponse response = ManageBucketMessage.send(targetMember, partitionedRegion, bucketId, |
| newBucketSize, forceCreation); |
| |
| if (response.waitForAcceptance()) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBucketOnMember: Bucket creation succeed for bucketId={} on member = {}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember); |
| } |
| |
| return ManageBucketRsp.YES; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBucketOnMember: Bucket creation failed for bucketId={} on member = {}", |
| partitionedRegion.bucketStringForLogs(bucketId), targetMember); |
| } |
| |
| return response.rejectedDueToInitialization() ? ManageBucketRsp.NO_INITIALIZING |
| : ManageBucketRsp.NO; |
| |
| } catch (PartitionOfflineException e) { |
| throw e; |
| } catch (VirtualMachineError err) { |
| SystemFailure.initiateFailure(err); |
| // If this ever returns, rethrow the error. We're poisoned |
| // now, so don't let this thread continue. |
| throw err; |
| } catch (Throwable e) { |
| // Whenever you catch Error or Throwable, you must also |
| // catch VirtualMachineError (see above). However, there is |
| // _still_ a possibility that you are dealing with a cascading |
| // error condition, so you also need to check to see if the JVM |
| // is still usable: |
| SystemFailure.checkFailure(); |
| if (e instanceof CancelException |
| || e.getCause() != null && e.getCause() instanceof CancelException) { |
| // no need to log exceptions caused by cache closure |
| return ManageBucketRsp.CLOSED; |
| } |
| if (e instanceof ForceReattemptException) { |
| // no log needed |
| } else { |
| logger.warn("Exception creating partition on {}", targetMember, e); |
| } |
| return ManageBucketRsp.NO; |
| } |
| } |
| |
| PartitionedRegionDataStore dataStore = partitionedRegion.getDataStore(); |
| boolean bucketManaged = dataStore != null && dataStore.handleManageBucketRequest(bucketId, |
| newBucketSize, partitionedRegion.getMyId(), forceCreation); |
| if (!bucketManaged) { |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "createBucketOnMember: Local data store not able to accommodate the data for bucketId={}", |
| partitionedRegion.bucketStringForLogs(bucketId)); |
| } |
| } |
| return ManageBucketRsp.valueOf(bucketManaged); |
| } |
| |
| /** |
| * Select the member with which is hosting the same bucketid for the PR it is colocated with In |
| * case of primary it returns the same node whereas in case of secondary it will return the least |
| * loaded datastore which is hosting the bucketid. |
| * |
| * @return InternalDistributedMember colocated data store |
| * @since GemFire 5.8Beta |
| */ |
| private InternalDistributedMember getColocatedDataStore( |
| Collection<InternalDistributedMember> candidates, |
| Collection<InternalDistributedMember> alreadyUsed, int bucketId, String prName) { |
| Assert.assertTrue(prName != null); |
| PartitionedRegion colocatedRegion = ColocationHelper.getColocatedRegion(partitionedRegion); |
| Region<?, ?> prRoot = PartitionedRegionHelper.getPRRoot(partitionedRegion.getCache()); |
| PartitionRegionConfig config = |
| (PartitionRegionConfig) prRoot.get(partitionedRegion.getRegionIdentifier()); |
| |
| if (!config.isColocationComplete()) { |
| throw new IllegalStateException( |
| "Cannot create buckets, as colocated regions are not configured to be at the same nodes."); |
| } |
| |
| RegionAdvisor advisor = colocatedRegion.getRegionAdvisor(); |
| if (alreadyUsed.isEmpty()) { |
| InternalDistributedMember primary = advisor.getPrimaryMemberForBucket(bucketId); |
| if (!candidates.contains(primary)) { |
| return null; |
| } |
| return primary; |
| } |
| |
| Set<InternalDistributedMember> bucketOwnersSet = advisor.getBucketOwners(bucketId); |
| bucketOwnersSet.retainAll(candidates); |
| Collection<InternalDistributedMember> members = new ArrayList<>(bucketOwnersSet); |
| if (members.isEmpty()) { |
| return null; |
| } |
| |
| return getPreferredDataStore(members, alreadyUsed); |
| } |
| |
| /** |
| * Select the member with the fewest buckets, among those with the fewest randomly select one. |
| * |
| * Under concurrent access, the data that this method uses, may be somewhat volatile, note that |
| * createBucketAtomically synchronizes to enhance the consistency of the data used in this method. |
| * |
| * @param candidates collection of InternalDistributedMember, potential datastores |
| * @param alreadyUsed data stores already in use |
| * @return a member with the fewest buckets or null if no datastores |
| */ |
| private InternalDistributedMember getPreferredDataStore( |
| Collection<InternalDistributedMember> candidates, |
| Collection<InternalDistributedMember> alreadyUsed) { |
| // has a primary already been chosen? |
| boolean forPrimary = alreadyUsed.isEmpty(); |
| |
| if (forPrimary && getForceLocalPrimaries()) { |
| PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore(); |
| if (localDataStore != null) { |
| return partitionedRegion.getMyId(); |
| } |
| } |
| |
| if (candidates.size() == 1) { |
| return candidates.iterator().next(); |
| } |
| Assert.assertTrue(candidates.size() > 1); |
| |
| // Convert peers to DataStoreBuckets |
| List<DataStoreBuckets> stores = partitionedRegion.getRegionAdvisor() |
| .adviseFilteredDataStores(new HashSet<>(candidates)); |
| |
| DistributionManager dm = partitionedRegion.getDistributionManager(); |
| |
| // Add local member as a candidate, if appropriate |
| InternalDistributedMember localMember = dm.getId(); |
| PartitionedRegionDataStore localDataStore = partitionedRegion.getDataStore(); |
| if (localDataStore != null && candidates.contains(localMember)) { |
| int bucketCount = localDataStore.getBucketsManaged(); |
| int priCount = localDataStore.getNumberOfPrimaryBucketsManaged(); |
| int localMaxMemory = partitionedRegion.getLocalMaxMemory(); |
| stores.add(new DataStoreBuckets(localMember, bucketCount, priCount, localMaxMemory)); |
| } |
| |
| if (stores.isEmpty()) { |
| return null; |
| } |
| |
| // --------------------------------------------- |
| // Calculate all hosts who already have this bucket |
| Collection<InternalDistributedMember> existingHosts = new HashSet<>(); |
| for (InternalDistributedMember mem : alreadyUsed) { |
| existingHosts.addAll(dm.getMembersInSameZone(mem)); |
| } |
| |
| Comparator<DataStoreBuckets> comparator = (d1, d2) -> { |
| boolean host1Used = existingHosts.contains(d1.memberId()); |
| boolean host2Used = existingHosts.contains(d2.memberId()); |
| |
| if (!host1Used && host2Used) { |
| // host1 preferred |
| return -1; |
| } |
| if (host1Used && !host2Used) { |
| // host2 preferred |
| return 1; |
| } |
| |
| // Look for least loaded |
| float load1; |
| float load2; |
| if (forPrimary) { |
| load1 = d1.numPrimaries() / (float) d1.localMaxMemoryMB(); |
| load2 = d2.numPrimaries() / (float) d2.localMaxMemoryMB(); |
| } else { |
| load1 = d1.numBuckets() / (float) d1.localMaxMemoryMB(); |
| load2 = d2.numBuckets() / (float) d2.localMaxMemoryMB(); |
| } |
| |
| int result = Float.compare(load1, load2); |
| if (result == 0) { |
| // if they have the same load, choose the member with the higher localMaxMemory |
| result = d2.localMaxMemoryMB() - d1.localMaxMemoryMB(); |
| } |
| return result; |
| }; |
| |
| // --------------------------------------------- |
| // First step is to sort datastores first by those whose hosts don't |
| // hold this bucket, and then secondarily by loading. |
| stores.sort(comparator); |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(fancyFormatBucketAllocation("Sorted ", stores, existingHosts)); |
| } |
| |
| // --------------------------------------------- |
| // Always add the first datastore and note just how good it is. |
| DataStoreBuckets bestDataStore = stores.get(0); |
| List<DataStoreBuckets> bestStores = new ArrayList<>(); |
| bestStores.add(bestDataStore); |
| |
| boolean allStoresInUse = alreadyUsed.contains(bestDataStore.memberId()); |
| |
| // --------------------------------------------- |
| // Collect all of the other hosts in this sorted list that are as good as the very first one. |
| for (int i = 1; i < stores.size(); i++) { |
| DataStoreBuckets aDataStore = stores.get(i); |
| if (!allStoresInUse && alreadyUsed.contains(aDataStore.memberId())) { |
| // Only choose between the ones not in use. |
| break; |
| } |
| |
| if (comparator.compare(bestDataStore, aDataStore) != 0) { |
| break; |
| } |
| bestStores.add(aDataStore); |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug(fancyFormatBucketAllocation("Best Stores ", bestStores, existingHosts)); |
| } |
| |
| // --------------------------------------------- |
| int chosen; |
| if (DISABLE_CREATE_BUCKET_RANDOMNESS) { |
| chosen = 0; |
| } else { |
| // Pick one (at random) |
| chosen = PartitionedRegion.RANDOM.nextInt(bestStores.size()); |
| } |
| |
| return bestStores.get(chosen).memberId(); |
| } |
| |
| /** |
| * Adds a membership listener to watch for member departures, and schedules a task to recover |
| * redundancy of existing buckets |
| */ |
| void startRedundancyRecovery() { |
| partitionedRegion.getRegionAdvisor().addMembershipListener(new PRMembershipListener()); |
| scheduleRedundancyRecovery(null); |
| } |
| |
| /** |
| * Log bucket allocation in the log files in this format: |
| * |
| * <pre> |
| * member1: +5/20 |
| * member2: -10/5 |
| * </pre> |
| * |
| * <p> |
| * After the member name, the +/- indicates whether or not this bucket is already hosted on the |
| * given member. This is followed by the number of hosted primaries followed by the number of |
| * hosted non-primary buckets. |
| * |
| * @param prefix first part of message to print |
| * @param dataStores list of stores |
| * @param existingStores to mark those already in use |
| */ |
| private String fancyFormatBucketAllocation(String prefix, Iterable<DataStoreBuckets> dataStores, |
| Collection<InternalDistributedMember> existingStores) { |
| StringBuilder logStr = new StringBuilder(); |
| if (prefix != null) { |
| logStr.append(prefix); |
| } |
| logStr.append("Bucket Allocation for prId=").append(partitionedRegion.getPRId()).append(":"); |
| logStr.append(lineSeparator()); |
| |
| for (Object dataStore : dataStores) { |
| DataStoreBuckets buckets = (DataStoreBuckets) dataStore; |
| logStr.append(buckets.memberId()).append(": "); |
| if (existingStores.contains(buckets.memberId())) { |
| logStr.append("+"); |
| } else { |
| logStr.append("-"); |
| } |
| logStr.append(buckets.numPrimaries()); |
| logStr.append("/"); |
| logStr.append(buckets.numBuckets() - buckets.numPrimaries()); |
| |
| logStr.append(lineSeparator()); |
| } |
| return logStr.toString(); |
| } |
| |
| /** |
| * Verifies the members and removes the members that are either not present in the |
| * DistributedSystem or are no longer part of the PartitionedRegion (close/localDestroy has been |
| * performed.) . |
| * |
| * @param members collection of members to scan and modify |
| */ |
| private void verifyBucketNodes(Collection<InternalDistributedMember> members, |
| String partitionName) { |
| if (members == null || members.isEmpty()) { |
| return; |
| } |
| |
| // Revisit region advisor, get current bucket stores. |
| Set<InternalDistributedMember> availableMembers = getAllStores(partitionName); |
| |
| for (Iterator<InternalDistributedMember> iterator = members.iterator(); iterator.hasNext();) { |
| InternalDistributedMember node = iterator.next(); |
| if (!availableMembers.contains(node)) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("verifyBucketNodes: removing member {}", node); |
| } |
| iterator.remove(); |
| Assert.assertTrue(!members.contains(node), "return value does not contain " + node); |
| } |
| } |
| } |
| |
| /** |
| * Schedule a task to perform redundancy recovery for a new node or for the node departed. |
| */ |
| private void scheduleRedundancyRecovery(Object failedMemberId) { |
| boolean isStartup = failedMemberId == null; |
| long delay; |
| boolean movePrimaries; |
| |
| if (isStartup) { |
| delay = partitionedRegion.getPartitionAttributes().getStartupRecoveryDelay(); |
| movePrimaries = !Boolean |
| .getBoolean(GEMFIRE_PREFIX + "DISABLE_MOVE_PRIMARIES_ON_STARTUP"); |
| } else { |
| delay = partitionedRegion.getPartitionAttributes().getRecoveryDelay(); |
| movePrimaries = false; |
| } |
| |
| final boolean requiresRedundancyRecovery = delay >= 0; |
| |
| if (!requiresRedundancyRecovery) { |
| return; |
| } |
| if (!partitionedRegion.isDataStore()) { |
| return; |
| } |
| |
| Runnable task = new RecoveryRunnable(this) { |
| @Override |
| public void run2() { |
| try { |
| boolean isFixedPartitionedRegion = partitionedRegion.isFixedPartitionedRegion(); |
| |
| // always replace offline data for fixed partitioned regions - |
| // this guarantees we create the buckets we are supposed to create on this node. |
| boolean replaceOfflineData = isFixedPartitionedRegion || !isStartup; |
| |
| RebalanceDirector director; |
| if (isFixedPartitionedRegion) { |
| director = new FPRDirector(true, movePrimaries); |
| } else { |
| director = new CompositeDirector(true, true, false, movePrimaries); |
| } |
| |
| PartitionedRegionRebalanceOp rebalance = rebalanceOpFactory.create( |
| partitionedRegion, false, director, replaceOfflineData, false); |
| |
| long start = partitionedRegion.getPrStats().startRecovery(); |
| |
| if (isFixedPartitionedRegion) { |
| rebalance.executeFPA(); |
| } else { |
| rebalance.execute(); |
| } |
| |
| partitionedRegion.getPrStats().endRecovery(start); |
| recoveryFuture = null; |
| providerStartupTask.complete(null); |
| } catch (CancelException e) { |
| logger.debug("Cache closed while recovery in progress"); |
| providerStartupTask.completeExceptionally(e); |
| } catch (RegionDestroyedException e) { |
| logger.debug("Region destroyed while recovery in progress"); |
| providerStartupTask.completeExceptionally(e); |
| } catch (Exception e) { |
| logger.error("Unexpected exception during bucket recovery", e); |
| providerStartupTask.completeExceptionally(e); |
| } |
| } |
| }; |
| |
| synchronized (shutdownLock) { |
| if (!shutdown) { |
| try { |
| if (logger.isDebugEnabled()) { |
| if (isStartup) { |
| logger.debug("{} scheduling redundancy recovery in {} ms", partitionedRegion, delay); |
| } else { |
| logger.debug( |
| "partitionedRegion scheduling redundancy recovery after departure/crash/error in {} in {} ms", |
| failedMemberId, delay); |
| } |
| } |
| recoveryFuture = recoveryExecutor.schedule(task, delay, MILLISECONDS); |
| resourceManager.addStartupTask(providerStartupTask); |
| } catch (RejectedExecutionException e) { |
| // ok, the executor is shutting down. |
| } |
| } |
| } |
| } |
| |
| public boolean isRedundancyImpaired() { |
| int numBuckets = partitionedRegion.getPartitionAttributes().getTotalNumBuckets(); |
| int targetRedundancy = partitionedRegion.getPartitionAttributes().getRedundantCopies(); |
| |
| for (int i = 0; i < numBuckets; i++) { |
| int redundancy = partitionedRegion.getRegionAdvisor().getBucketRedundancy(i); |
| if (redundancy < targetRedundancy && redundancy != -1 || redundancy > targetRedundancy) { |
| return true; |
| } |
| } |
| |
| return false; |
| } |
| |
| boolean recoverPersistentBuckets() { |
| /* |
| * To handle a case where ParallelGatewaySender is persistent but userPR is not First recover |
| * the GatewaySender buckets for ParallelGatewaySender irrespective of whether colocation is |
| * complete or not. |
| */ |
| PartitionedRegion leaderRegion = ColocationHelper.getLeaderRegion(partitionedRegion); |
| |
| |
| // Check if the leader region or some child shadow PR region is persistent |
| // and return the first persistent region found |
| PartitionedRegion persistentLeader = getPersistentLeader(); |
| |
| // If there is no persistent region in the colocation chain, no need to recover. |
| if (persistentLeader == null) { |
| return true; |
| } |
| |
| if (!checkMembersColocation(leaderRegion, leaderRegion.getDistributionManager().getId())) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Skipping persistent recovery of {} because colocation is not complete for {}", |
| partitionedRegion, leaderRegion); |
| } |
| return false; |
| } |
| |
| ProxyBucketRegion[] proxyBucketArray = |
| persistentLeader.getRegionAdvisor().getProxyBucketArray(); |
| if (proxyBucketArray.length == 0) { |
| throw new IllegalStateException("Unexpected empty proxy bucket array"); |
| } |
| for (ProxyBucketRegion proxyBucket : proxyBucketArray) { |
| proxyBucket.initializePersistenceAdvisor(); |
| } |
| |
| Set<InternalDistributedMember> peers = partitionedRegion.getRegionAdvisor().adviseGeneric(); |
| |
| // We need to make sure here that we don't run into this race condition: |
| // 1) We get a membership view from member A |
| // 2) Member B removes itself, and distributes to us and A. We don't remove B |
| // 3) We apply the membership view from A, which includes B. |
| // That will add B back into the set. |
| // This state flush will make sure that any membership changes |
| // That are in progress on the peers are finished. |
| MembershipFlushRequest.send(peers, partitionedRegion.getDistributionManager(), |
| partitionedRegion.getFullPath()); |
| |
| List<ProxyBucketRegion> bucketsNotHostedLocally = new ArrayList<>(proxyBucketArray.length); |
| List<ProxyBucketRegion> bucketsHostedLocally = new ArrayList<>(proxyBucketArray.length); |
| |
| createPersistentBucketRecoverer(proxyBucketArray.length); |
| |
| /* |
| * Spawn a separate thread for bucket that we previously hosted to recover that bucket. |
| * |
| * That thread will get to the point at which it has determined that at least one member |
| * (possibly the local member) has fully initialized the bucket, at which it will count down the |
| * someMemberRecoveredLatch latch on the bucket. |
| * |
| * Once at least one copy of each bucket has been created in the distributed system, the |
| * initPRInternals method will exit. Some of the threads spawned here will still be doing GII's |
| * in the background. This allows the system to become usable as fast as possible. |
| * |
| * If we used a bounded thread pool here, we end up waiting for some buckets to finish there GII |
| * before returning from initPRInternals. In the future maybe we could let the create bucket |
| * return and pass the GII task to a separate thread pool. |
| * |
| */ |
| for (ProxyBucketRegion proxyBucket : proxyBucketArray) { |
| if (proxyBucket.getPersistenceAdvisor().wasHosting()) { |
| RecoveryRunnable recoveryRunnable = new RecoveryRunnable(this) { |
| |
| @Override |
| public void run() { |
| // make sure that we always count down this latch, even if the region was destroyed. |
| try { |
| super.run(); |
| } finally { |
| if (getPersistentBucketRecoverer() != null) { |
| getPersistentBucketRecoverer().countDown(); |
| } |
| } |
| } |
| |
| @Override |
| public void run2() { |
| proxyBucket.recoverFromDiskRecursively(); |
| } |
| }; |
| |
| Thread recoveryThread = |
| new LoggingThread("Recovery thread for bucket " + proxyBucket.getName(), |
| false, recoveryRunnable); |
| recoveryThread.start(); |
| bucketsHostedLocally.add(proxyBucket); |
| } else { |
| bucketsNotHostedLocally.add(proxyBucket); |
| } |
| } |
| |
| try { |
| // try to recover the local buckets before the proxy buckets. This will allow us to detect |
| // any ConflictingDataException before the proxy buckets update their membership view. |
| for (ProxyBucketRegion proxyBucket : bucketsHostedLocally) { |
| proxyBucket.waitForPrimaryPersistentRecovery(); |
| } |
| |
| if (!partitionedRegion.isInternalRegion() && !bucketsNotHostedLocally.isEmpty()) { |
| partitionedRegion.notifyRegionCreated(); |
| } |
| |
| for (ProxyBucketRegion proxyBucket : bucketsNotHostedLocally) { |
| proxyBucket.recoverFromDiskRecursively(); |
| } |
| } finally { |
| if (getPersistentBucketRecoverer() != null) { |
| getPersistentBucketRecoverer().countDown(bucketsNotHostedLocally.size()); |
| } |
| } |
| |
| return true; |
| } |
| |
| @VisibleForTesting |
| void createPersistentBucketRecoverer(int proxyBuckets) { |
| persistentBucketRecoverer = persistentBucketRecovererFunction.apply(this, proxyBuckets); |
| persistentBucketRecoverer.startLoggingThread(); |
| } |
| |
| @VisibleForTesting |
| PersistentBucketRecoverer getPersistentBucketRecoverer() { |
| return persistentBucketRecoverer; |
| } |
| |
| /** |
| * Check to see if any colocated region of the current region is persistent. It's not enough to |
| * check just the leader region, because a child region might be a persistent parallel WAN queue, |
| * which is allowed. |
| * |
| * @return the most senior region in the colocation chain (closest to the leader) that is |
| * persistent. |
| */ |
| private PartitionedRegion getPersistentLeader() { |
| PartitionedRegion leader = ColocationHelper.getLeaderRegion(partitionedRegion); |
| return findPersistentRegionRecursively(leader); |
| } |
| |
| private PartitionedRegion findPersistentRegionRecursively(PartitionedRegion partitionedRegion) { |
| if (partitionedRegion.getDataPolicy().withPersistence()) { |
| return partitionedRegion; |
| } |
| for (PartitionedRegion child : ColocationHelper.getColocatedChildRegions(partitionedRegion)) { |
| PartitionedRegion leader = findPersistentRegionRecursively(child); |
| if (leader != null) { |
| return leader; |
| } |
| } |
| return null; |
| } |
| |
| void scheduleCreateMissingBuckets() { |
| if (partitionedRegion.getColocatedWith() != null) { |
| Runnable task = new CreateMissingBucketsTask(this); |
| final InternalResourceManager resourceManager = |
| partitionedRegion.getGemFireCache().getInternalResourceManager(); |
| resourceManager.getExecutor().execute(task); |
| } |
| } |
| |
| public void shutdown() { |
| synchronized (shutdownLock) { |
| shutdown = true; |
| ScheduledFuture<?> recoveryFuture = this.recoveryFuture; |
| if (recoveryFuture != null) { |
| recoveryFuture.cancel(false); |
| recoveryExecutor.purge(); |
| } |
| } |
| } |
| |
| /** |
| * Creates and fills in a PartitionMemberDetails for the partitioned region. |
| * |
| * @param internal true if internal-only details should be included |
| * @param loadProbe the LoadProbe to use |
| * @return PartitionRegionInfo for the partitioned region |
| */ |
| public InternalPRInfo buildPartitionedRegionInfo(boolean internal, LoadProbe loadProbe) { |
| PartitionedRegion pr = partitionedRegion; |
| |
| if (pr == null) { |
| return null; |
| } |
| |
| int configuredBucketCount = pr.getTotalNumberOfBuckets(); |
| int createdBucketCount = pr.getRegionAdvisor().getCreatedBucketsCount(); |
| |
| int lowRedundancyBucketCount = pr.getRedundancyTracker().getLowRedundancyBuckets(); |
| int configuredRedundantCopies = pr.getRedundantCopies(); |
| int actualRedundantCopies = pr.getRedundancyTracker().getActualRedundancy(); |
| |
| PartitionedRegionDataStore dataStore = pr.getDataStore(); |
| |
| Set<InternalDistributedMember> datastores = pr.getRegionAdvisor().adviseDataStore(); |
| |
| Set<InternalPartitionDetails> memberDetails = new TreeSet<>(); |
| |
| OfflineMemberDetails offlineMembers = null; |
| boolean fetchOfflineMembers = false; |
| if (dataStore != null) { |
| memberDetails.add(buildPartitionMemberDetails(internal, loadProbe)); |
| offlineMembers = fetchOfflineMembers(); |
| } else { |
| fetchOfflineMembers = true; |
| } |
| |
| // Get remote results |
| if (!datastores.isEmpty()) { |
| FetchPartitionDetailsResponse response = FetchPartitionDetailsMessage.send(datastores, pr, |
| internal, fetchOfflineMembers, loadProbe); |
| memberDetails.addAll(response.waitForResponse()); |
| if (fetchOfflineMembers) { |
| offlineMembers = response.getOfflineMembers(); |
| } |
| } |
| |
| String colocatedWithPath = pr.getColocatedWith(); |
| |
| return new PartitionRegionInfoImpl(pr.getFullPath(), configuredBucketCount, |
| createdBucketCount, lowRedundancyBucketCount, configuredRedundantCopies, |
| actualRedundantCopies, memberDetails, colocatedWithPath, offlineMembers); |
| } |
| |
| /** |
| * Retrieve the set of members which are currently offline for all buckets. |
| */ |
| public OfflineMemberDetailsImpl fetchOfflineMembers() { |
| ProxyBucketRegion[] proxyBuckets = partitionedRegion.getRegionAdvisor().getProxyBucketArray(); |
| Set<PersistentMemberID>[] offlineMembers = new Set[proxyBuckets.length]; |
| for (int i = 0; i < proxyBuckets.length; i++) { |
| ProxyBucketRegion proxyBucket = proxyBuckets[i]; |
| if (partitionedRegion.getDataPolicy().withPersistence()) { |
| Set<PersistentMemberID> persistedMembers = |
| proxyBucket.getPersistenceAdvisor().getMissingMembers(); |
| if (persistedMembers == null) { |
| persistedMembers = Collections.emptySet(); |
| } |
| offlineMembers[i] = persistedMembers; |
| } else { |
| offlineMembers[i] = Collections.emptySet(); |
| } |
| } |
| return new OfflineMemberDetailsImpl(offlineMembers); |
| } |
| |
| /** |
| * Creates and fills in a PartitionMemberDetails for the local member. |
| * |
| * @param internal true if internal-only details should be included |
| * @param loadProbe the LoadProbe to use |
| * @return PartitionMemberDetails for the local member |
| */ |
| public InternalPartitionDetails buildPartitionMemberDetails(boolean internal, |
| LoadProbe loadProbe) { |
| final PartitionedRegion pr = partitionedRegion; |
| |
| PartitionedRegionDataStore dataStore = pr.getDataStore(); |
| if (dataStore == null) { |
| return null; |
| } |
| |
| long size = 0; |
| InternalDistributedMember localMember = pr.getMyId(); |
| |
| int configuredBucketCount = pr.getTotalNumberOfBuckets(); |
| long[] bucketSizes = new long[configuredBucketCount]; |
| |
| // key: bid, value: size |
| Map<Integer, Integer> bucketSizeMap = dataStore.getSizeLocally(); |
| |
| for (Map.Entry<Integer, Integer> me : bucketSizeMap.entrySet()) { |
| int bid = me.getKey(); |
| long bucketSize = dataStore.getBucketSize(bid); |
| bucketSizes[bid] = bucketSize; |
| size += bucketSize; |
| } |
| |
| InternalPartitionDetails localDetails; |
| if (internal) { |
| waitForPersistentBucketRecoveryOrClose(); |
| |
| PRLoad prLoad = loadProbe.getLoad(pr); |
| localDetails = |
| new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size, |
| dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged(), prLoad, |
| bucketSizes); |
| } else { |
| localDetails = |
| new PartitionMemberInfoImpl(localMember, pr.getLocalMaxMemory() * 1024L * 1024L, size, |
| dataStore.getBucketsManaged(), dataStore.getNumberOfPrimaryBucketsManaged()); |
| } |
| return localDetails; |
| } |
| |
| /** |
| * Wait for all persistent buckets to be recovered from disk, or for the region to be closed, |
| * whichever happens first. |
| */ |
| private void waitForPersistentBucketRecoveryOrClose() { |
| if (getPersistentBucketRecoverer() != null) { |
| getPersistentBucketRecoverer().await( |
| PartitionedRegionHelper.DEFAULT_WAIT_PER_RETRY_ITERATION, MILLISECONDS); |
| } |
| |
| List<PartitionedRegion> colocatedRegions = |
| ColocationHelper.getColocatedChildRegions(partitionedRegion); |
| for (PartitionedRegion child : colocatedRegions) { |
| child.getRedundancyProvider().waitForPersistentBucketRecoveryOrClose(); |
| } |
| } |
| |
| /** |
| * Wait for all persistent buckets to be recovered from disk, regardless of whether the region is |
| * currently being closed. |
| */ |
| void waitForPersistentBucketRecovery() { |
| if (getPersistentBucketRecoverer() != null) { |
| getPersistentBucketRecoverer().await(); |
| } |
| } |
| |
| public boolean isPersistentRecoveryComplete() { |
| if (!checkMembersColocation(partitionedRegion, partitionedRegion.getMyId())) { |
| return false; |
| } |
| |
| if (getPersistentBucketRecoverer() != null |
| && !getPersistentBucketRecoverer().hasRecoveryCompleted()) { |
| return false; |
| } |
| |
| Map<String, PartitionedRegion> colocatedRegions = |
| ColocationHelper.getAllColocationRegions(partitionedRegion); |
| |
| for (PartitionedRegion region : colocatedRegions.values()) { |
| PRHARedundancyProvider redundancyProvider = region.getRedundancyProvider(); |
| if (redundancyProvider.getPersistentBucketRecoverer() != null && |
| !redundancyProvider.getPersistentBucketRecoverer().hasRecoveryCompleted()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private ThreadsMonitoring getThreadMonitorObj() { |
| DistributionManager distributionManager = partitionedRegion.getDistributionManager(); |
| if (distributionManager != null) { |
| return distributionManager.getThreadMonitoring(); |
| } |
| return null; |
| } |
| |
| /** |
| * Monitors distributed membership for a given bucket |
| */ |
| private class BucketMembershipObserver implements MembershipListener { |
| |
| private final Bucket bucketToMonitor; |
| private final AtomicInteger arrivals = new AtomicInteger(0); |
| private final AtomicBoolean departures = new AtomicBoolean(false); |
| |
| private BucketMembershipObserver(Bucket bucketToMonitor) { |
| this.bucketToMonitor = bucketToMonitor; |
| } |
| |
| private BucketMembershipObserver beginMonitoring() { |
| int profilesPresent = bucketToMonitor.getBucketAdvisor() |
| .addMembershipListenerAndAdviseGeneric(this).size(); |
| arrivals.addAndGet(profilesPresent); |
| return this; |
| } |
| |
| private void stopMonitoring() { |
| bucketToMonitor.getBucketAdvisor().removeMembershipListener(this); |
| } |
| |
| @Override |
| public void memberJoined(DistributionManager distributionManager, |
| InternalDistributedMember id) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Observer for bucket {} member joined {}", bucketToMonitor, id); |
| } |
| synchronized (this) { |
| arrivals.addAndGet(1); |
| notifyAll(); |
| } |
| } |
| |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| InternalDistributedMember id, boolean crashed) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Observer for bucket {} member departed {}", bucketToMonitor, id); |
| } |
| synchronized (this) { |
| departures.getAndSet(true); |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * Wait for expected number of owners to be recognized. When the expected number have been seen, |
| * then fetch the primary and report it. If while waiting for the owners to be recognized there |
| * is a departure which compromises redundancy |
| * |
| * @param expectedCount the number of bucket owners to wait for |
| * @param expectedOwners the list of owners used when a departure is detected |
| * @return if no problematic departures are detected, the primary |
| */ |
| private BucketMembershipObserverResults waitForOwnersGetPrimary(int expectedCount, |
| Collection<InternalDistributedMember> expectedOwners, String partitionName) |
| throws InterruptedException { |
| boolean problematicDeparture = false; |
| synchronized (this) { |
| while (true) { |
| bucketToMonitor.getCancelCriterion().checkCancelInProgress(null); |
| |
| // If any departures, need to rethink much... |
| boolean oldDepartures = departures.get(); |
| if (oldDepartures) { |
| verifyBucketNodes(expectedOwners, partitionName); |
| if (expectedOwners.isEmpty()) { |
| problematicDeparture = true; // need to pick new victims |
| } |
| |
| // need to pick new victims |
| arrivals.set(expectedOwners.size()); |
| departures.set(false); |
| |
| if (problematicDeparture) { |
| if (logger.isDebugEnabled()) { |
| logger.debug("Bucket observer found departed members - retrying"); |
| } |
| } |
| break; |
| } |
| |
| // Look for success... |
| int oldArrivals = arrivals.get(); |
| if (oldArrivals >= expectedCount) { |
| // success! |
| break; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Waiting for bucket {} to finish being created", |
| partitionedRegion.bucketStringForLogs(bucketToMonitor.getId())); |
| } |
| |
| partitionedRegion.checkReadiness(); |
| |
| int creationWaitMillis = 5 * 1000; |
| wait(creationWaitMillis); |
| |
| if (oldArrivals == arrivals.get() && oldDepartures == departures.get()) { |
| logger.warn( |
| "Time out waiting {} ms for creation of bucket for partitioned region {}. Members requested to create the bucket are: {}", |
| creationWaitMillis, partitionedRegion.getFullPath(), expectedOwners); |
| } |
| } |
| } |
| |
| if (problematicDeparture) { |
| return new BucketMembershipObserverResults(true, null); |
| } |
| |
| InternalDistributedMember primary = bucketToMonitor.getBucketAdvisor().getPrimary(); |
| if (primary == null) { |
| /* |
| * Handle a race where nobody has the bucket. We can't return a null member here because we |
| * haven't created the bucket, need to let the higher level code loop. |
| */ |
| return new BucketMembershipObserverResults(true, null); |
| } |
| return new BucketMembershipObserverResults(false, primary); |
| } |
| } |
| |
| /** |
| * This class extends MembershipListener to perform cleanup when a node leaves DistributedSystem. |
| */ |
| private class PRMembershipListener implements MembershipListener { |
| |
| @Override |
| public void memberDeparted(DistributionManager distributionManager, |
| InternalDistributedMember id, boolean crashed) { |
| try { |
| DistributedMember member = partitionedRegion.getSystem().getDistributedMember(); |
| if (logger.isDebugEnabled()) { |
| logger.debug( |
| "MembershipListener invoked on DistributedMember = {} for failed memberId = {}", |
| member, |
| id); |
| } |
| |
| if (!partitionedRegion.isCacheClosing() && !partitionedRegion.isDestroyed() |
| && !member.equals(id)) { |
| Runnable postRecoveryTask = null; |
| |
| // Only schedule redundancy recovery if this not a fixed PR. |
| if (!partitionedRegion.isFixedPartitionedRegion()) { |
| postRecoveryTask = () -> { |
| // After the metadata has been cleaned, recover redundancy. |
| scheduleRedundancyRecovery(id); |
| }; |
| } |
| // Schedule clean up the metadata for the failed member. |
| PartitionedRegionHelper.cleanUpMetaDataForRegion(partitionedRegion.getCache(), |
| partitionedRegion.getRegionIdentifier(), id, postRecoveryTask); |
| } |
| } catch (CancelException ignore) { |
| // ignore |
| } |
| } |
| } |
| |
| private static class ManageBucketRsp { |
| |
| @Immutable |
| private static final ManageBucketRsp NO = new ManageBucketRsp("NO"); |
| @Immutable |
| private static final ManageBucketRsp YES = new ManageBucketRsp("YES"); |
| @Immutable |
| private static final ManageBucketRsp NO_INITIALIZING = new ManageBucketRsp("NO_INITIALIZING"); |
| @Immutable |
| private static final ManageBucketRsp CLOSED = new ManageBucketRsp("CLOSED"); |
| |
| private final String name; |
| |
| private ManageBucketRsp(String name) { |
| this.name = name; |
| } |
| |
| private boolean isAcceptance() { |
| return this == YES; |
| } |
| |
| @Override |
| public String toString() { |
| return "ManageBucketRsp(" + name + ")"; |
| } |
| |
| /** return YES if the argument is true, NO if not */ |
| private static ManageBucketRsp valueOf(boolean managed) { |
| return managed ? YES : NO; |
| } |
| } |
| |
| private static class BucketMembershipObserverResults { |
| private final boolean problematicDeparture; |
| final InternalDistributedMember primary; |
| |
| BucketMembershipObserverResults(boolean re, InternalDistributedMember p) { |
| problematicDeparture = re; |
| primary = p; |
| } |
| |
| @Override |
| public String toString() { |
| return "pDepart:" + problematicDeparture + " primary:" + primary; |
| } |
| } |
| |
| private static class ArrayListWithClearState<T> extends ArrayList<T> { |
| |
| private static final long serialVersionUID = 1L; |
| private volatile boolean wasCleared; |
| |
| private boolean wasCleared() { |
| return wasCleared; |
| } |
| |
| @Override |
| public void clear() { |
| super.clear(); |
| wasCleared = true; |
| } |
| } |
| } |