| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more contributor license |
| * agreements. See the NOTICE file distributed with this work for additional information regarding |
| * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance with the License. You may obtain a |
| * copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software distributed under the License |
| * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express |
| * or implied. See the License for the specific language governing permissions and limitations under |
| * the License. |
| */ |
| package org.apache.geode.internal.cache.persistence; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Set; |
| |
| import org.apache.logging.log4j.Logger; |
| |
| import org.apache.geode.annotations.Immutable; |
| import org.apache.geode.annotations.VisibleForTesting; |
| import org.apache.geode.annotations.internal.MutableForTesting; |
| import org.apache.geode.cache.DiskAccessException; |
| import org.apache.geode.cache.RegionDestroyedException; |
| import org.apache.geode.cache.persistence.ConflictingPersistentDataException; |
| import org.apache.geode.cache.persistence.RevokedPersistentDataException; |
| import org.apache.geode.distributed.DistributedLockService; |
| import org.apache.geode.distributed.internal.DistributionAdvisor.Profile; |
| import org.apache.geode.distributed.internal.DistributionManager; |
| import org.apache.geode.distributed.internal.ProfileListener; |
| import org.apache.geode.distributed.internal.ReplyException; |
| import org.apache.geode.distributed.internal.membership.InternalDistributedMember; |
| import org.apache.geode.internal.CopyOnWriteHashSet; |
| import org.apache.geode.internal.cache.CacheDistributionAdvisor; |
| import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile; |
| import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice; |
| import org.apache.geode.internal.cache.DiskRegionStats; |
| import org.apache.geode.internal.cache.persistence.PersistentMemberManager.MemberRevocationListener; |
| import org.apache.geode.internal.cache.persistence.PersistentStateQueryMessage.PersistentStateQueryReplyProcessor; |
| import org.apache.geode.internal.logging.log4j.LogMarker; |
| import org.apache.geode.internal.process.StartupStatus; |
| import org.apache.geode.internal.util.TransformUtils; |
| import org.apache.geode.logging.internal.log4j.api.LogService; |
| |
| public class PersistenceAdvisorImpl implements InternalPersistenceAdvisor { |
| |
| private static final Logger logger = LogService.getLogger(); |
| @Immutable |
| private static final PersistenceAdvisorObserver DEFAULT_PERSISTENCE_ADVISOR_OBSERVER = s -> { |
| }; |
| @MutableForTesting |
| private static PersistenceAdvisorObserver persistenceAdvisorObserver = |
| DEFAULT_PERSISTENCE_ADVISOR_OBSERVER; |
| |
| protected final Object lock; |
| |
| protected final CacheDistributionAdvisor cacheDistributionAdvisor; |
| protected final String regionPath; |
| protected final PersistentMemberView persistentMemberView; |
| private final DiskRegionStats diskRegionStats; |
| private final PersistentMemberManager persistentMemberManager; |
| private final ProfileChangeListener profileChangeListener; |
| |
| private final Set<PersistentMemberID> recoveredMembers; |
| private final Set<PersistentMemberID> removedMembers = new HashSet<>(); |
| private final Set<PersistentMemberID> equalMembers; |
| private final DistributedLockService distributedLockService; |
| |
| private volatile boolean holdingTieLock; |
| |
| protected volatile boolean online; |
| private volatile Set<PersistentStateListener> persistentStateListeners = Collections.emptySet(); |
| private volatile boolean initialized; |
| private volatile boolean shouldUpdatePersistentView; |
| protected volatile boolean isClosed; |
| |
| protected volatile Set<PersistentMemberID> allMembersWaitingFor; |
| protected volatile Set<PersistentMemberID> offlineMembersWaitingFor; |
| |
| private final PersistentStateQueryMessageSenderFactory persistentStateQueryMessageSenderFactory; |
| |
| public PersistenceAdvisorImpl(CacheDistributionAdvisor cacheDistributionAdvisor, |
| DistributedLockService distributedLockService, PersistentMemberView persistentMemberView, |
| String regionPath, DiskRegionStats diskRegionStats, |
| PersistentMemberManager persistentMemberManager) { |
| this(cacheDistributionAdvisor, distributedLockService, persistentMemberView, regionPath, |
| diskRegionStats, persistentMemberManager, new PersistentStateQueryMessageSenderFactory()); |
| } |
| |
| @VisibleForTesting |
| PersistenceAdvisorImpl(CacheDistributionAdvisor cacheDistributionAdvisor, |
| DistributedLockService distributedLockService, PersistentMemberView persistentMemberView, |
| String regionPath, DiskRegionStats diskRegionStats, |
| PersistentMemberManager persistentMemberManager, |
| PersistentStateQueryMessageSenderFactory persistentStateQueryMessageSenderFactory) { |
| this.cacheDistributionAdvisor = cacheDistributionAdvisor; |
| this.distributedLockService = distributedLockService; |
| this.regionPath = regionPath; |
| this.persistentMemberView = persistentMemberView; |
| this.diskRegionStats = diskRegionStats; |
| profileChangeListener = new ProfileChangeListener(); |
| this.persistentMemberManager = persistentMemberManager; |
| this.persistentStateQueryMessageSenderFactory = persistentStateQueryMessageSenderFactory; |
| |
| // Prevent membership changes while we are persisting the membership view online. If we |
| // synchronize on something else, we need to be careful about lock ordering because the |
| // membership notifications are called with the advisor lock held. |
| lock = cacheDistributionAdvisor; |
| |
| // Remember which members we know about because of what we have persisted. We will later use |
| // this to handle updates from peers. |
| recoveredMembers = getPersistedMembers(); |
| |
| // To prevent races if we crash during initialization, mark equal members as online before we |
| // initialize. We will still report these members as equal, but if we crash and recover they |
| // will no longer be considered equal. |
| equalMembers = new CopyOnWriteHashSet<>(persistentMemberView.getOfflineAndEqualMembers()); |
| for (PersistentMemberID id : equalMembers) { |
| persistentMemberView.memberOnline(id); |
| } |
| } |
| |
| @Override |
| public void initialize() { |
| if (initialized) { |
| return; |
| } |
| |
| if (wasAboutToDestroy()) { |
| logger.info("Region {} crashed during a region destroy. Finishing the destroy.", |
| regionPath); |
| finishPendingDestroy(); |
| } |
| |
| cacheDistributionAdvisor.addProfileChangeListener(profileChangeListener); |
| |
| Set<PersistentMemberPattern> revokedMembers = persistentMemberManager |
| .addRevocationListener(profileChangeListener, persistentMemberView.getRevokedMembers()); |
| |
| for (PersistentMemberPattern pattern : revokedMembers) { |
| memberRevoked(pattern); |
| } |
| |
| // Start logging changes to the persistent view |
| startMemberLogging(); |
| |
| initialized = true; |
| } |
| |
| /** |
| * Adds a PersistentStateListener whose job is to log changes in the persistent view. |
| */ |
| private void startMemberLogging() { |
| addListener(new PersistentStateListener.PersistentStateAdapter() { |
| /** |
| * A persistent member has gone offline. Log the offline member and log which persistent |
| * members are still online (the current persistent view). |
| */ |
| @Override |
| public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) { |
| if (logger.isDebugEnabled()) { |
| |
| Set<PersistentMemberID> members = |
| new HashSet<>(cacheDistributionAdvisor.adviseInitializedPersistentMembers().values()); |
| members.remove(persistentID); |
| |
| Set<String> onlineMembers = new HashSet<>(); |
| TransformUtils.transform(members, onlineMembers, |
| TransformUtils.persistentMemberIdToLogEntryTransformer); |
| |
| logger.info( |
| "The following persistent member has gone offline for region {}: {} Remaining participating members for the region include: {}", |
| new Object[] {regionPath, |
| TransformUtils.persistentMemberIdToLogEntryTransformer |
| .transform(persistentID), |
| onlineMembers}); |
| } |
| } |
| }); |
| } |
| |
| @Override |
| public PersistentStateQueryResults getMyStateOnMembers(Set<InternalDistributedMember> members) |
| throws ReplyException { |
| return fetchPersistentStateQueryResults(members, |
| cacheDistributionAdvisor.getDistributionManager(), persistentMemberView.getMyPersistentID(), |
| persistentMemberView.getMyInitializingID()); |
| } |
| |
| private PersistentStateQueryResults fetchPersistentStateQueryResults( |
| Set<InternalDistributedMember> members, DistributionManager dm, |
| PersistentMemberID persistentMemberID, PersistentMemberID initializingMemberId) { |
| PersistentStateQueryReplyProcessor replyProcessor = persistentStateQueryMessageSenderFactory |
| .createPersistentStateQueryReplyProcessor(dm, members); |
| PersistentStateQueryMessage message = |
| persistentStateQueryMessageSenderFactory.createPersistentStateQueryMessage(regionPath, |
| persistentMemberID, initializingMemberId, replyProcessor.getProcessorId()); |
| return message.send(members, dm, replyProcessor); |
| } |
| |
| /** |
| * Return what state we have persisted for a given peer's id. |
| */ |
| @Override |
| public PersistentMemberState getPersistedStateOfMember(PersistentMemberID id) { |
| if (isRevoked(id)) { |
| return PersistentMemberState.REVOKED; |
| } |
| |
| // If the peer is marked as equal, indicate they are equal |
| if (equalMembers != null && equalMembers.contains(id)) { |
| return PersistentMemberState.EQUAL; |
| } |
| |
| // If we have a member that is marked as online that is an older version of the peers id, tell |
| // them they are online |
| for (PersistentMemberID onlineMember : persistentMemberView.getOnlineMembers()) { |
| if (onlineMember.isOlderOrEqualVersionOf(id)) { |
| return PersistentMemberState.ONLINE; |
| } |
| } |
| |
| // If we have a member that is marked as offline that is a newer version of the peers id, tell |
| // them they are online |
| for (PersistentMemberID offline : persistentMemberView.getOfflineMembers()) { |
| if (id.isOlderOrEqualVersionOf(offline)) { |
| return PersistentMemberState.OFFLINE; |
| } |
| } |
| return null; |
| } |
| |
| @Override |
| public void updateMembershipView(InternalDistributedMember peer, boolean targetReinitializing) { |
| beginUpdatingPersistentView(); |
| DistributionManager dm = cacheDistributionAdvisor.getDistributionManager(); |
| PersistentMembershipView peersPersistentMembershipView = |
| MembershipViewRequest.send(peer, dm, regionPath, targetReinitializing); |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Updating persistent view from {}", |
| shortDiskStoreId(), regionPath, peer); |
| } |
| |
| synchronized (lock) { |
| PersistentMemberID myId = getPersistentID(); |
| Map<InternalDistributedMember, PersistentMemberID> peersOnlineMembers = |
| peersPersistentMembershipView.getOnlineMembers(); |
| Set<PersistentMemberID> peersOfflineMembers = |
| peersPersistentMembershipView.getOfflineMembers(); |
| |
| for (PersistentMemberID id : peersOnlineMembers.values()) { |
| if (!isRevoked(id) && !removedMembers.contains(id)) { |
| if (!id.equals(myId) && !recoveredMembers.remove(id) |
| && !id.getDiskStoreId().equals(getDiskStoreID())) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Processing membership view from peer. Marking {} as online because {} says its online", |
| shortDiskStoreId(), regionPath, id, peer); |
| } |
| persistentMemberView.memberOnline(id); |
| } |
| } |
| } |
| |
| for (PersistentMemberID id : peersOfflineMembers) { |
| if (!isRevoked(id) && !removedMembers.contains(id)) { |
| // This method is called before the current member is online. if the peer knows about a |
| // member that the current member doesn't know about, that means that member must have |
| // been added to the DS after the current member went offline. Therefore, that member is |
| // *newer* than the current member. So mark that member as online (meaning, online later |
| // than the current member). |
| if (!id.equals(myId) && !recoveredMembers.remove(id) |
| && !id.getDiskStoreId().equals(getDiskStoreID())) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Processing membership view from peer. Marking {} as online because {} says its offline, but we have never seen it", |
| shortDiskStoreId(), regionPath, id, peer); |
| } |
| persistentMemberView.memberOnline(id); |
| } |
| } |
| } |
| |
| |
| for (PersistentMemberID id : recoveredMembers) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Processing membership view from peer. Removing {} because {} doesn't have it", |
| shortDiskStoreId(), regionPath, id, peer); |
| } |
| persistentMemberView.memberRemoved(id); |
| } |
| } |
| |
| // Update the set of revoked members from the peer. This should be called without holding the |
| // lock to avoid deadlocks |
| Set<PersistentMemberPattern> revokedMembers = peersPersistentMembershipView.getRevokedMembers(); |
| for (PersistentMemberPattern revoked : revokedMembers) { |
| persistentMemberManager.revokeMember(revoked); |
| } |
| } |
| |
| private boolean isRevoked(PersistentMemberID id) { |
| return persistentMemberManager.isRevoked(regionPath, id); |
| } |
| |
| @Override |
| public void setOnline(boolean didGII, boolean atomicCreation, PersistentMemberID newId) |
| throws ReplyException { |
| if (online) { |
| return; |
| } |
| |
| if (!didGII) { |
| setInitializing(newId); |
| } |
| |
| synchronized (lock) { |
| |
| // Transition any members that are marked as online, but not actually currently running, to |
| // offline. |
| Set<PersistentMemberID> membersToMarkOffline = |
| new HashSet<>(persistentMemberView.getOnlineMembers()); |
| Map<InternalDistributedMember, PersistentMemberID> onlineMembers; |
| if (!atomicCreation) { |
| onlineMembers = cacheDistributionAdvisor.adviseInitializedPersistentMembers(); |
| } else { |
| // Fix for 41100 - If this is an atomic bucket creation, don't mark our peers, which are |
| // concurrently initializing, as offline they have the exact same data as we do (none), so |
| // we are not technically "newer," and this avoids a race where both members can think the |
| // other is offline ("older"). |
| onlineMembers = cacheDistributionAdvisor.advisePersistentMembers(); |
| } |
| membersToMarkOffline.removeAll(onlineMembers.values()); |
| |
| // Another fix for 41100 - Don't mark equal members as offline if that are currently running. |
| // We don't have newer data than these members so this is safe, and it it avoids a race where |
| // we mark them offline at this point, and then later they mark us as offline. |
| if (equalMembers != null && !equalMembers.isEmpty()) { |
| |
| // This is slightly hacky. We're looking for a running member that has the same disk store |
| // as our equal members, because all have is a persistent id of the equal members. The |
| // persistent id of the running member may be different than what we have marked as equal, |
| // because the id in the profile is the new id for the member. |
| Collection<PersistentMemberID> allMembers = |
| cacheDistributionAdvisor.advisePersistentMembers().values(); |
| Set<DiskStoreID> runningDiskStores = new HashSet<>(); |
| for (PersistentMemberID mem : allMembers) { |
| runningDiskStores.add(mem.getDiskStoreId()); |
| } |
| // Remove any equal members which are not actually running right now. |
| for (PersistentMemberID id : equalMembers) { |
| if (!runningDiskStores.contains(id.getDiskStoreId())) { |
| equalMembers.remove(id); |
| } |
| } |
| membersToMarkOffline.removeAll(equalMembers); |
| } |
| for (PersistentMemberID id : membersToMarkOffline) { |
| persistentMemberView.memberOffline(id); |
| } |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Persisting the new membership view and ID as online. Online members {}. Offline members {}. Equal memebers {}.", |
| shortDiskStoreId(), regionPath, persistentMemberView.getOnlineMembers(), |
| persistentMemberView.getOfflineMembers(), equalMembers); |
| } |
| |
| persistentMemberView.setInitialized(); |
| online = true; |
| removedMembers.clear(); |
| } |
| if (diskRegionStats != null) { |
| diskRegionStats.incInitializations(!didGII); |
| } |
| } |
| |
| /** |
| * Start listening for persistent view updates and apply any updates that have already happened. |
| * |
| * This method should be called after we have decided that there is no conflicting persistent |
| * exception. |
| * |
| * Fix for bug 44045. |
| */ |
| protected void beginUpdatingPersistentView() { |
| synchronized (lock) { |
| // Only update the view if it is has not already happened. |
| if (!shouldUpdatePersistentView) { |
| shouldUpdatePersistentView = true; |
| Map<InternalDistributedMember, PersistentMemberID> onlineMembers = |
| cacheDistributionAdvisor.adviseInitializedPersistentMembers(); |
| for (Map.Entry<InternalDistributedMember, PersistentMemberID> entry : onlineMembers |
| .entrySet()) { |
| memberOnline(entry.getKey(), entry.getValue()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void setInitializing(PersistentMemberID newId) { |
| beginUpdatingPersistentView(); |
| |
| DistributionManager dm = cacheDistributionAdvisor.getDistributionManager(); |
| |
| PersistentMemberID oldId = getPersistentID(); |
| PersistentMemberID initializingId = getInitializingID(); |
| |
| Set<InternalDistributedMember> profileUpdateRecipients = |
| cacheDistributionAdvisor.adviseProfileUpdate(); |
| if (newId == null || !newId.equals(oldId) && !newId.equals(initializingId)) { |
| // If we have not yet prepared the old id, prepare it now. |
| |
| |
| // This will only be the case if we crashed while initializing previously. In the case, we are |
| // essentially finishing what we started by preparing that ID first. This will remove that ID |
| // from the peers. |
| if (initializingId != null) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: We still have an initializing id: {}. Telling peers to remove the old id {} and transitioning this initializing id to old id. recipients {}", |
| shortDiskStoreId(), regionPath, initializingId, oldId, profileUpdateRecipients); |
| } |
| long viewVersion = cacheDistributionAdvisor.startOperation(); |
| try { |
| PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, regionPath, oldId, |
| initializingId); |
| } finally { |
| if (viewVersion != -1) { |
| cacheDistributionAdvisor.endOperation(viewVersion); |
| } |
| } |
| oldId = initializingId; |
| } |
| |
| if (logger.isDebugEnabled()) { |
| logger.debug("Persisting my new persistent ID {}", newId); |
| } |
| persistentMemberView.setInitializing(newId); |
| } |
| |
| profileUpdateRecipients = cacheDistributionAdvisor.adviseProfileUpdate(); |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Sending the new ID to peers. They should remove the old id {}. Recipients: {}", |
| shortDiskStoreId(), regionPath, oldId, profileUpdateRecipients); |
| } |
| if (newId != null) { |
| PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, regionPath, oldId, newId); |
| } |
| } |
| |
| @Override |
| public PersistentMemberID generatePersistentID() { |
| return persistentMemberView.generatePersistentID(); |
| } |
| |
| @Override |
| public PersistentMembershipView getMembershipView() { |
| if (!initialized) { |
| return null; |
| } |
| Set<PersistentMemberID> offlineMembers = getPersistedMembers(); |
| Map<InternalDistributedMember, PersistentMemberID> onlineMembers = |
| cacheDistributionAdvisor.adviseInitializedPersistentMembers(); |
| offlineMembers.removeAll(onlineMembers.values()); |
| |
| PersistentMemberID myId = getPersistentID(); |
| if (myId != null) { |
| onlineMembers |
| .put(cacheDistributionAdvisor.getDistributionManager().getDistributionManagerId(), myId); |
| } |
| |
| return new PersistentMembershipView(offlineMembers, onlineMembers, |
| persistentMemberManager.getRevokedMembers()); |
| } |
| |
| @Override |
| public Set<PersistentMemberID> getPersistedMembers() { |
| Set<PersistentMemberID> persistentMembers = new HashSet<>(); |
| persistentMembers.addAll(persistentMemberView.getOfflineMembers()); |
| persistentMembers.addAll(persistentMemberView.getOfflineAndEqualMembers()); |
| persistentMembers.addAll(persistentMemberView.getOnlineMembers()); |
| return persistentMembers; |
| } |
| |
| @Override |
| public boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates) |
| throws ReplyException { |
| PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates); |
| |
| persistenceAdvisorObserver.observe(regionPath); |
| |
| boolean equal = false; |
| for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : remoteStates |
| .getStateOnPeers() |
| .entrySet()) { |
| InternalDistributedMember member = entry.getKey(); |
| PersistentMemberID remoteId = remoteStates.getPersistentIds().get(member); |
| |
| final PersistentMemberID myId = getPersistentID(); |
| PersistentMemberState stateOnPeer = entry.getValue(); |
| |
| if (PersistentMemberState.REVOKED.equals(stateOnPeer)) { |
| throw new RevokedPersistentDataException( |
| String.format( |
| "The persistent member id %s has been revoked in this distributed system. You cannot recover from disk files which have been revoked.", |
| myId)); |
| } |
| |
| if (myId != null && stateOnPeer == null) { |
| String message = String.format( |
| "Region %s remote member %s with persistent data %s was not part of the same distributed system as the local data from %s", |
| regionPath, member, remoteId, myId); |
| throw new ConflictingPersistentDataException(message); |
| } |
| |
| if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) { |
| equal = true; |
| } |
| |
| // The other member changes its ID when it comes back online. |
| if (remoteId != null) { |
| PersistentMemberState remoteState = getPersistedStateOfMember(remoteId); |
| if (remoteState == PersistentMemberState.OFFLINE) { |
| String message = |
| String.format( |
| "Region %s refusing to initialize from member %s with persistent data %s which was offline when the local data from %s was last online", |
| regionPath, member, remoteId, myId); |
| throw new ConflictingPersistentDataException(message); |
| } |
| } |
| } |
| return equal; |
| } |
| |
| public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver o) { |
| persistenceAdvisorObserver = o == null ? DEFAULT_PERSISTENCE_ADVISOR_OBSERVER : o; |
| } |
| |
| @Override |
| public PersistentMemberID getPersistentIDIfOnline() { |
| if (online) { |
| return persistentMemberView.getMyPersistentID(); |
| } else { |
| return null; |
| } |
| } |
| |
| private void memberOffline(InternalDistributedMember distributedMember, |
| PersistentMemberID persistentID) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Member offine. id={}, persistentID={}", shortDiskStoreId(), regionPath, |
| distributedMember, persistentID); |
| } |
| synchronized (lock) { |
| boolean foundMember = recoveredMembers.remove(persistentID); |
| foundMember |= equalMembers.remove(persistentID); |
| foundMember |= getPersistedMembers().contains(persistentID); |
| // Don't persist members as offline until we are online. Otherwise, we may think we have later |
| // data than them during recovery. |
| if (shouldUpdatePersistentView && online) { |
| try { |
| // Don't persistent members as offline if we have already persisted them as equal. |
| if (persistentMemberView.getOfflineAndEqualMembers().contains(persistentID)) { |
| return; |
| } |
| // Don't mark the member as offline if we have never seen it. If we haven't seen it that |
| // means it's not done initializing yet. |
| if (foundMember) { |
| if (PersistenceObserverHolder.getInstance().memberOffline(regionPath, persistentID)) { |
| persistentMemberView.memberOffline(persistentID); |
| } |
| PersistenceObserverHolder.getInstance().afterPersistedOffline(regionPath, persistentID); |
| } |
| } catch (DiskAccessException e) { |
| logger.warn("Unable to persist membership change", e); |
| } |
| } |
| notifyListenersMemberOffline(distributedMember, persistentID); |
| } |
| |
| } |
| |
| private void memberOnline(InternalDistributedMember distributedMember, |
| PersistentMemberID persistentID) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Sending the new ID to peers. Member online. id={}, persistentID={}", |
| shortDiskStoreId(), regionPath, distributedMember, persistentID); |
| } |
| synchronized (lock) { |
| if (shouldUpdatePersistentView) { |
| recoveredMembers.remove(persistentID); |
| try { |
| if (PersistenceObserverHolder.getInstance().memberOnline(regionPath, persistentID)) { |
| persistentMemberView.memberOnline(persistentID); |
| } |
| PersistenceObserverHolder.getInstance().afterPersistedOnline(regionPath, persistentID); |
| } catch (DiskAccessException e) { |
| logger.warn("Unable to persist membership change", e); |
| } |
| } else { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Not marking member online in persistent view because we're still in initialization", |
| shortDiskStoreId(), regionPath); |
| } |
| } |
| |
| notifyListenersMemberOnline(distributedMember, persistentID); |
| } |
| } |
| |
| private void memberRevoked(PersistentMemberPattern pattern) { |
| // Persist the revoked member, so if we recover later we will remember that they were revoked. |
| persistentMemberView.memberRevoked(pattern); |
| |
| // Remove the revoked member from our view. |
| for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) { |
| if (pattern.matches(id)) { |
| memberRemoved(id, true); |
| } |
| } |
| for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) { |
| if (pattern.matches(id)) { |
| memberRemoved(id, true); |
| } |
| } |
| for (PersistentMemberID id : persistentMemberView.getOfflineAndEqualMembers()) { |
| if (pattern.matches(id)) { |
| memberRemoved(id, true); |
| } |
| } |
| } |
| |
| private void memberRemoved(PersistentMemberID id, boolean revoked) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Member removed. persistentID={}", |
| shortDiskStoreId(), regionPath, id); |
| } |
| |
| synchronized (lock) { |
| recoveredMembers.remove(id); |
| equalMembers.remove(id); |
| if (!online) { |
| removedMembers.add(id); |
| } |
| try { |
| if (PersistenceObserverHolder.getInstance().memberRemoved(regionPath, id)) { |
| persistentMemberView.memberRemoved(id); |
| } |
| |
| // Purge any IDs that are old versions of the the id that we just removed |
| for (PersistentMemberID persistedId : getPersistedMembers()) { |
| if (persistedId.isOlderOrEqualVersionOf(id)) { |
| persistentMemberView.memberRemoved(persistedId); |
| } |
| } |
| PersistenceObserverHolder.getInstance().afterRemovePersisted(regionPath, id); |
| } catch (DiskAccessException e) { |
| logger.warn("Unable to persist membership change", e); |
| } |
| notifyListenersMemberRemoved(id, revoked); |
| } |
| } |
| |
| @Override |
| public PersistentMemberID getPersistentID() { |
| return persistentMemberView.getMyPersistentID(); |
| } |
| |
| @Override |
| public PersistentMemberID getInitializingID() { |
| return persistentMemberView.getMyInitializingID(); |
| } |
| |
| @Override |
| public void addListener(PersistentStateListener listener) { |
| synchronized (this) { |
| Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners); |
| tmpListeners.add(listener); |
| persistentStateListeners = Collections.unmodifiableSet(tmpListeners); |
| } |
| |
| } |
| |
| @Override |
| public void removeListener(PersistentStateListener listener) { |
| synchronized (this) { |
| Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners); |
| tmpListeners.remove(listener); |
| persistentStateListeners = Collections.unmodifiableSet(tmpListeners); |
| } |
| } |
| |
| private void notifyListenersMemberOnline(InternalDistributedMember member, |
| PersistentMemberID persistentID) { |
| for (PersistentStateListener listener : persistentStateListeners) { |
| listener.memberOnline(member, persistentID); |
| } |
| } |
| |
| private void notifyListenersMemberOffline(InternalDistributedMember member, |
| PersistentMemberID persistentID) { |
| for (PersistentStateListener listener : persistentStateListeners) { |
| listener.memberOffline(member, persistentID); |
| } |
| } |
| |
| private void notifyListenersMemberRemoved(PersistentMemberID persistentID, boolean revoked) { |
| for (PersistentStateListener listener : persistentStateListeners) { |
| listener.memberRemoved(persistentID, revoked); |
| } |
| } |
| |
| @Override |
| public HashSet<PersistentMemberID> getPersistedOnlineOrEqualMembers() { |
| HashSet<PersistentMemberID> members = new HashSet<>(persistentMemberView.getOnlineMembers()); |
| members.addAll(equalMembers); |
| return members; |
| } |
| |
| @Override |
| public void prepareNewMember(InternalDistributedMember sender, PersistentMemberID oldId, |
| PersistentMemberID newId) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Preparing new persistent id {}. Old id is {}", shortDiskStoreId(), regionPath, |
| newId, oldId); |
| } |
| synchronized (lock) { |
| // Don't prepare the ID if the advisor doesn't have a profile. This prevents a race with the |
| // advisor remove |
| if (!cacheDistributionAdvisor.containsId(sender)) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Refusing to prepare id because {} is not in our advisor", shortDiskStoreId(), |
| regionPath, sender); |
| } |
| return; |
| } |
| // Persist new members even if we are not online yet. Two members can become online at once. |
| // This way, they will know about each other. |
| persistentMemberView.memberOnline(newId); |
| |
| // The oldId and newId could be the same if the member is retrying a GII. See bug #42051 |
| if (oldId != null && !oldId.equals(newId)) { |
| if (initialized) { |
| memberRemoved(oldId, false); |
| } |
| } |
| } |
| } |
| |
| protected String shortDiskStoreId() { |
| DiskStoreID diskStoreID = getDiskStoreID(); |
| return diskStoreID == null ? "mem" : diskStoreID.abbrev(); |
| } |
| |
| @Override |
| public void removeMember(PersistentMemberID id) { |
| memberRemoved(id, false); |
| } |
| |
| @Override |
| public void markMemberOffline(InternalDistributedMember member, PersistentMemberID id) { |
| memberOffline(member, id); |
| } |
| |
| @Override |
| public CacheDistributionAdvisor getCacheDistributionAdvisor() { |
| return cacheDistributionAdvisor; |
| } |
| |
| @Override |
| public void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor, |
| Set<PersistentMemberID> offlineMembersToWaitFor) { |
| allMembersWaitingFor = allMembersToWaitFor; |
| offlineMembersWaitingFor = offlineMembersToWaitFor; |
| } |
| |
| @Override |
| public boolean isClosed() { |
| return isClosed; |
| } |
| |
| |
| public void finishPendingDestroy() { |
| // send a message to peers indicating that they should remove this profile |
| long viewVersion = cacheDistributionAdvisor.startOperation(); |
| try { |
| RemovePersistentMemberMessage.send(cacheDistributionAdvisor.adviseProfileUpdate(), |
| cacheDistributionAdvisor.getDistributionManager(), regionPath, getPersistentID(), |
| getInitializingID()); |
| |
| persistentMemberView.finishPendingDestroy(); |
| } finally { |
| if (viewVersion != -1) { |
| cacheDistributionAdvisor.endOperation(viewVersion); |
| } |
| } |
| synchronized (lock) { |
| recoveredMembers.clear(); |
| recoveredMembers.addAll(getPersistedMembers()); |
| } |
| } |
| |
| /** |
| * Returns the member id of the member who has the latest copy of the persistent region. This may |
| * be the local member ID if this member has the latest known copy. |
| * |
| * This method will block until the latest member is online. |
| * |
| * @throws ConflictingPersistentDataException if there are active members which are not based on |
| * the state that is persisted in this member. |
| */ |
| @Override |
| public InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice, |
| boolean hasDiskImageToRecoverFrom) { |
| PersistenceInitialImageAdvisor piia = new PersistenceInitialImageAdvisor(this, |
| shortDiskStoreId(), regionPath, cacheDistributionAdvisor, hasDiskImageToRecoverFrom); |
| return piia.getAdvice(previousAdvice); |
| } |
| |
| /** |
| * @param previouslyOnlineMembers the members we have persisted online in our persistence files |
| * @param offlineMembers This method will populate this set with any members that we are waiting |
| * for which are actually not running right now. This is different from the set of members |
| * we need to wait for - this member may end up waiting on a member that is actually |
| * running. |
| * @return the list of members that this member needs to wait for before it can initialize. |
| */ |
| @Override |
| public Set<PersistentMemberID> getMembersToWaitFor( |
| Set<PersistentMemberID> previouslyOnlineMembers, Set<PersistentMemberID> offlineMembers) |
| throws ReplyException { |
| PersistentMemberID myPersistentID = getPersistentID(); |
| PersistentMemberID myInitializingId = getInitializingID(); |
| |
| // This is the set of members that are currently waiting for this member |
| // to come online. |
| Set<PersistentMemberID> membersToWaitFor = new HashSet<>(previouslyOnlineMembers); |
| offlineMembers.addAll(previouslyOnlineMembers); |
| |
| // If our persistent ID is null, we need to wait for all of the previously online members. |
| if (myPersistentID != null || myInitializingId != null) { |
| Set<InternalDistributedMember> members = cacheDistributionAdvisor.adviseProfileUpdate(); |
| Set<InternalDistributedMember> membersHostingThisRegion = |
| cacheDistributionAdvisor.adviseGeneric(); |
| |
| // Fetch the persistent view from all of our peers. |
| PersistentStateQueryResults results = fetchPersistentStateQueryResults(members, |
| cacheDistributionAdvisor.getDistributionManager(), myPersistentID, myInitializingId); |
| |
| // iterate through all of the peers. For each peer: if the member was previously online |
| // according |
| // to us, grab its online members and add them to the members to wait for set. We may need to |
| // do this several times until we discover all of the members that may have newer data than |
| // us. |
| boolean addedMembers = true; |
| while (addedMembers) { |
| addedMembers = false; |
| for (Entry<InternalDistributedMember, Set<PersistentMemberID>> entry : results |
| .getOnlineMemberMap() |
| .entrySet()) { |
| InternalDistributedMember memberId = entry.getKey(); |
| Set<PersistentMemberID> peersOnlineMembers = entry.getValue(); |
| PersistentMemberID persistentID = results.getPersistentIds().get(memberId); |
| PersistentMemberID initializingID = results.getInitializingIds().get(memberId); |
| if (membersToWaitFor.contains(persistentID) |
| || membersToWaitFor.contains(initializingID)) { |
| for (PersistentMemberID peerOnlineMember : peersOnlineMembers) { |
| if (!isRevoked(peerOnlineMember) |
| && !peerOnlineMember.getDiskStoreId().equals(getDiskStoreID()) |
| && !persistentMemberView.getOfflineMembers().contains(peerOnlineMember)) { |
| if (membersToWaitFor.add(peerOnlineMember)) { |
| addedMembers = true; |
| // Make sure we also persist that this member is online. |
| persistentMemberView.memberOnline(peerOnlineMember); |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Adding {} to the list of members we're wait for, because {} has newer or equal data than is and is waiting for that member", |
| shortDiskStoreId(), regionPath, peerOnlineMember, memberId); |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| removeOlderMembers(membersToWaitFor); |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Initial state of membersToWaitFor, before pruning {}", shortDiskStoreId(), |
| regionPath, membersToWaitFor); |
| } |
| |
| // For each of our peers, see what our state is according to their view. |
| for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : results |
| .getStateOnPeers().entrySet()) { |
| InternalDistributedMember memberId = entry.getKey(); |
| PersistentMemberID persistentID = results.getPersistentIds().get(memberId); |
| PersistentMemberID initializingID = results.getInitializingIds().get(memberId); |
| DiskStoreID diskStoreID = results.getDiskStoreIds().get(memberId); |
| PersistentMemberState state = entry.getValue(); |
| |
| if (PersistentMemberState.REVOKED.equals(state)) { |
| throw new RevokedPersistentDataException( |
| String.format( |
| "The persistent member id %s has been revoked in this distributed system. You cannot recover from disk files which have been revoked.", |
| myPersistentID)); |
| } |
| |
| // If the peer thinks we are newer or equal to them, we don't need to wait for this peer. |
| if (membersHostingThisRegion.contains(memberId) && persistentID != null && state != null |
| && myInitializingId == null && (state.equals(PersistentMemberState.ONLINE) |
| || state.equals(PersistentMemberState.EQUAL))) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Not waiting for {} because it thinks our state was {}", shortDiskStoreId(), |
| regionPath, persistentID, state); |
| } |
| removeNewerPersistentID(membersToWaitFor, persistentID); |
| } |
| |
| // If the peer has an initialized ID, they are no longer offline. |
| if (persistentID != null) { |
| removeNewerPersistentID(offlineMembers, persistentID); |
| } |
| |
| // If the peer thinks we are newer or equal to them, we don't need to wait for this peer. |
| if (membersHostingThisRegion.contains(memberId) && initializingID != null && state != null |
| && (state.equals(PersistentMemberState.ONLINE) |
| || state.equals(PersistentMemberState.EQUAL))) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Not waiting for {} because it thinks our state was {}", shortDiskStoreId(), |
| regionPath, initializingID, state); |
| } |
| removeByDiskStoreID(membersToWaitFor, diskStoreID, false); |
| } |
| |
| // If the peer has an initializing id, they are also not online. |
| if (initializingID != null) { |
| removeNewerPersistentID(offlineMembers, initializingID); |
| } |
| |
| handlePartiallyDestroyedRegion(offlineMembers, membersToWaitFor, persistentID, |
| initializingID, |
| diskStoreID); |
| } |
| } |
| return membersToWaitFor; |
| } |
| |
| /** |
| * In the event that the region was partially destroyed via DestroyDataStorage on the peer, |
| * we do not need to wait on that peer. Currently this state can be reached when a bucket region |
| * GII fails, which results in DestroyDataStorage on the region (as opposed to a DestroyRegion). |
| * See DiskRegion.destroyPartiallyInitializedRegion() which handles the failed GII on the image |
| * receiving side for more details. |
| */ |
| private void handlePartiallyDestroyedRegion(final Set<PersistentMemberID> offlineMembers, |
| final Set<PersistentMemberID> membersToWaitFor, |
| final PersistentMemberID persistentID, |
| final PersistentMemberID initializingID, |
| final DiskStoreID diskStoreID) { |
| /* |
| * When DestroyDataStorage is invoked on a peer for this region, we expect that its |
| * initializing and persistent IDs will be null, but the disk store ID will be non-null |
| * because the region was not fully destroyed. |
| */ |
| if (initializingID == null && persistentID == null & diskStoreID != null) { |
| removeByDiskStoreID(membersToWaitFor, diskStoreID, true); |
| removeByDiskStoreID(offlineMembers, diskStoreID, true); |
| } |
| } |
| |
| /** |
| * Given a set of persistent members, if the same member occurs more than once in the set but |
| * with different timestamps, remove the older ones leaving only the most recent. |
| * |
| * @param persistentMemberSet The set of persistent members, possibly modified by this method. |
| */ |
| protected void removeOlderMembers(Set<PersistentMemberID> persistentMemberSet) { |
| Map<DiskStoreID, PersistentMemberID> mostRecentMap = new HashMap<>(); |
| List<PersistentMemberID> idsToRemove = new ArrayList<>(); |
| for (PersistentMemberID persistentMember : persistentMemberSet) { |
| DiskStoreID diskStoreId = persistentMember.getDiskStoreId(); |
| PersistentMemberID mostRecent = mostRecentMap.get(diskStoreId); |
| if (mostRecent == null) { |
| mostRecentMap.put(diskStoreId, persistentMember); |
| } else { |
| PersistentMemberID older = persistentMember; |
| boolean persistentMemberIsNewer = |
| !persistentMember.isOlderOrEqualVersionOf(mostRecent); |
| if (persistentMemberIsNewer) { |
| older = mostRecent; |
| mostRecentMap.put(diskStoreId, persistentMember); |
| } |
| idsToRemove.add(older); |
| } |
| } |
| persistentMemberSet.removeAll(idsToRemove); |
| } |
| |
| /** |
| * Remove all members with a given disk store id from the set of members to wait for, who is newer |
| * than the real one. The reason is: A is waiting for B2, but B sends B1<=A to A. That means A |
| * knows more than B in both B1 and B2. B itself knows nothing about B2. So we don't need to wait |
| * for B2 (since we don't need to wait for B1). |
| */ |
| private void removeNewerPersistentID(Set<PersistentMemberID> membersToWaitFor, |
| PersistentMemberID persistentID) { |
| for (Iterator<PersistentMemberID> itr = membersToWaitFor.iterator(); itr.hasNext();) { |
| PersistentMemberID id = itr.next(); |
| if (persistentID.isOlderOrEqualVersionOf(id)) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Not waiting for {} because local member knows more about it", |
| shortDiskStoreId(), regionPath, id); |
| } |
| itr.remove(); |
| } |
| } |
| } |
| |
| /** |
| * Remove all members with a given disk store id from the set of members to wait for. |
| */ |
| private void removeByDiskStoreID(Set<PersistentMemberID> membersToWaitFor, |
| DiskStoreID diskStoreID, boolean updateAdvisor) { |
| for (Iterator<PersistentMemberID> itr = membersToWaitFor.iterator(); itr.hasNext();) { |
| PersistentMemberID id = itr.next(); |
| if (id.getDiskStoreId().equals(diskStoreID)) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, |
| "{}-{}: Not waiting for {} because it no longer has this region in its disk store", |
| shortDiskStoreId(), regionPath, id); |
| } |
| itr.remove(); |
| if (updateAdvisor) { |
| memberRemoved(id, false); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean wasHosting() { |
| return getPersistentID() != null || getInitializingID() != null; |
| } |
| |
| protected String getRegionPathForOfflineMembers() { |
| return regionPath; |
| } |
| |
| /** |
| * Returns the set of missing members that we report back to the any admin DS looking for missing |
| * members. |
| */ |
| protected Set<PersistentMemberID> getMissingMembers() { |
| return offlineMembersWaitingFor; |
| } |
| |
| /** |
| * Returns the set of missing members that we report back to the any admin DS looking for missing |
| * members. |
| */ |
| public Set<PersistentMemberID> getAllMembersToWaitFor() { |
| return allMembersWaitingFor; |
| } |
| |
| @Override |
| public void logWaitingForMembers() { |
| Set<String> membersToWaitForLogEntries = new HashSet<>(); |
| |
| if (offlineMembersWaitingFor != null && !offlineMembersWaitingFor.isEmpty()) { |
| TransformUtils.transform(offlineMembersWaitingFor, membersToWaitForLogEntries, |
| TransformUtils.persistentMemberIdToLogEntryTransformer); |
| |
| StartupStatus.startup( |
| String.format( |
| "Region %s has potentially stale data. It is waiting for another member to recover the latest data.My persistent id:%sMembers with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.", |
| regionPath, |
| TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), |
| membersToWaitForLogEntries)); |
| } else { |
| TransformUtils.transform(allMembersWaitingFor, membersToWaitForLogEntries, |
| TransformUtils.persistentMemberIdToLogEntryTransformer); |
| |
| StartupStatus.startup( |
| String.format( |
| "Region %s has potentially stale data. It is waiting for another online member to recover the latest data.My persistent id:%sMembers with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.", |
| regionPath, |
| TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()), |
| membersToWaitForLogEntries)); |
| } |
| } |
| |
| @Override |
| public void clearEqualMembers() { |
| synchronized (lock) { |
| equalMembers.clear(); |
| } |
| } |
| |
| @Override |
| public void checkInterruptedByShutdownAll() {} |
| |
| @Override |
| public void close() { |
| isClosed = true; |
| persistentMemberManager.removeRevocationListener(profileChangeListener); |
| cacheDistributionAdvisor.removeProfileChangeListener(profileChangeListener); |
| releaseTieLock(); |
| } |
| |
| |
| /** |
| * Try to acquire the distributed lock which members must grab for in the case of a tie. Whoever |
| * gets the lock initializes first. |
| */ |
| @Override |
| public boolean acquireTieLock() { |
| // We're tied for the latest copy of the data. try to get the distributed lock. |
| holdingTieLock = distributedLockService.lock("PERSISTENCE_" + regionPath, 0, -1); |
| if (!holdingTieLock) { |
| if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) { |
| logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Failed to acquire the lock.", |
| shortDiskStoreId(), regionPath); |
| } |
| } |
| return holdingTieLock; |
| } |
| |
| @Override |
| public void releaseTieLock() { |
| if (holdingTieLock) { |
| distributedLockService.unlock("PERSISTENCE_" + regionPath); |
| holdingTieLock = false; |
| } |
| } |
| |
| private boolean wasAboutToDestroy() { |
| return persistentMemberView.wasAboutToDestroy() |
| || persistentMemberView.wasAboutToDestroyDataStorage(); |
| } |
| |
| protected synchronized void resetState() { |
| online = false; |
| removedMembers.clear(); |
| } |
| |
| public void flushMembershipChanges() { |
| try { |
| cacheDistributionAdvisor.waitForCurrentOperations(); |
| } catch (RegionDestroyedException ignored) { |
| } |
| } |
| |
| @Override |
| public void persistMembersOfflineAndEqual( |
| Map<InternalDistributedMember, PersistentMemberID> map) { |
| for (PersistentMemberID persistentID : map.values()) { |
| persistentMemberView.memberOfflineAndEqual(persistentID); |
| } |
| } |
| |
| @Override |
| public DiskStoreID getDiskStoreID() { |
| return persistentMemberView.getDiskStoreID(); |
| } |
| |
| @Override |
| public boolean isOnline() { |
| return online; |
| } |
| |
| public interface PersistenceAdvisorObserver { |
| void observe(String regionPath); |
| } |
| |
| private class ProfileChangeListener implements ProfileListener, MemberRevocationListener { |
| |
| @Override |
| public void profileCreated(Profile profile) { |
| profileUpdated(profile); |
| } |
| |
| @Override |
| public void profileRemoved(Profile profile, boolean destroyed) { |
| CacheProfile cp = (CacheProfile) profile; |
| if (cp.persistentID != null) { |
| if (destroyed) { |
| memberRemoved(cp.persistentID, false); |
| } else { |
| memberOffline(profile.getDistributedMember(), cp.persistentID); |
| } |
| } |
| } |
| |
| @Override |
| public void profileUpdated(Profile profile) { |
| CacheProfile cp = (CacheProfile) profile; |
| if (cp.persistentID != null && cp.persistenceInitialized) { |
| memberOnline(profile.getDistributedMember(), cp.persistentID); |
| } |
| } |
| |
| @Override |
| public void revoked(PersistentMemberPattern pattern) { |
| memberRevoked(pattern); |
| } |
| |
| @Override |
| public Set<PersistentMemberID> getMissingMemberIds() { |
| return getMissingMembers(); |
| } |
| |
| @Override |
| public String getRegionPath() { |
| return getRegionPathForOfflineMembers(); |
| } |
| |
| @Override |
| public boolean matches(PersistentMemberPattern pattern) { |
| return pattern.matches(getPersistentID()) || pattern.matches(getInitializingID()); |
| } |
| } |
| } |