blob: a207ce0d43083fbadccfe1d4c13815863c60759f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache.persistence;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.Immutable;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.annotations.internal.MutableForTesting;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.RegionDestroyedException;
import org.apache.geode.cache.persistence.ConflictingPersistentDataException;
import org.apache.geode.cache.persistence.RevokedPersistentDataException;
import org.apache.geode.distributed.DistributedLockService;
import org.apache.geode.distributed.internal.DistributionAdvisor.Profile;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.ProfileListener;
import org.apache.geode.distributed.internal.ReplyException;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.CopyOnWriteHashSet;
import org.apache.geode.internal.cache.CacheDistributionAdvisor;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.CacheProfile;
import org.apache.geode.internal.cache.CacheDistributionAdvisor.InitialImageAdvice;
import org.apache.geode.internal.cache.DiskRegionStats;
import org.apache.geode.internal.cache.persistence.PersistentMemberManager.MemberRevocationListener;
import org.apache.geode.internal.cache.persistence.PersistentStateQueryMessage.PersistentStateQueryReplyProcessor;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.internal.process.StartupStatus;
import org.apache.geode.internal.util.TransformUtils;
import org.apache.geode.logging.internal.log4j.api.LogService;
public class PersistenceAdvisorImpl implements InternalPersistenceAdvisor {
private static final Logger logger = LogService.getLogger();
@Immutable
private static final PersistenceAdvisorObserver DEFAULT_PERSISTENCE_ADVISOR_OBSERVER = s -> {
};
@MutableForTesting
private static PersistenceAdvisorObserver persistenceAdvisorObserver =
DEFAULT_PERSISTENCE_ADVISOR_OBSERVER;
protected final Object lock;
protected final CacheDistributionAdvisor cacheDistributionAdvisor;
protected final String regionPath;
protected final PersistentMemberView persistentMemberView;
private final DiskRegionStats diskRegionStats;
private final PersistentMemberManager persistentMemberManager;
private final ProfileChangeListener profileChangeListener;
private final Set<PersistentMemberID> recoveredMembers;
private final Set<PersistentMemberID> removedMembers = new HashSet<>();
private final Set<PersistentMemberID> equalMembers;
private final DistributedLockService distributedLockService;
private volatile boolean holdingTieLock;
protected volatile boolean online;
private volatile Set<PersistentStateListener> persistentStateListeners = Collections.emptySet();
private volatile boolean initialized;
private volatile boolean shouldUpdatePersistentView;
protected volatile boolean isClosed;
protected volatile Set<PersistentMemberID> allMembersWaitingFor;
protected volatile Set<PersistentMemberID> offlineMembersWaitingFor;
private final PersistentStateQueryMessageSenderFactory persistentStateQueryMessageSenderFactory;
public PersistenceAdvisorImpl(CacheDistributionAdvisor cacheDistributionAdvisor,
DistributedLockService distributedLockService, PersistentMemberView persistentMemberView,
String regionPath, DiskRegionStats diskRegionStats,
PersistentMemberManager persistentMemberManager) {
this(cacheDistributionAdvisor, distributedLockService, persistentMemberView, regionPath,
diskRegionStats, persistentMemberManager, new PersistentStateQueryMessageSenderFactory());
}
@VisibleForTesting
PersistenceAdvisorImpl(CacheDistributionAdvisor cacheDistributionAdvisor,
DistributedLockService distributedLockService, PersistentMemberView persistentMemberView,
String regionPath, DiskRegionStats diskRegionStats,
PersistentMemberManager persistentMemberManager,
PersistentStateQueryMessageSenderFactory persistentStateQueryMessageSenderFactory) {
this.cacheDistributionAdvisor = cacheDistributionAdvisor;
this.distributedLockService = distributedLockService;
this.regionPath = regionPath;
this.persistentMemberView = persistentMemberView;
this.diskRegionStats = diskRegionStats;
profileChangeListener = new ProfileChangeListener();
this.persistentMemberManager = persistentMemberManager;
this.persistentStateQueryMessageSenderFactory = persistentStateQueryMessageSenderFactory;
// Prevent membership changes while we are persisting the membership view online. If we
// synchronize on something else, we need to be careful about lock ordering because the
// membership notifications are called with the advisor lock held.
lock = cacheDistributionAdvisor;
// Remember which members we know about because of what we have persisted. We will later use
// this to handle updates from peers.
recoveredMembers = getPersistedMembers();
// To prevent races if we crash during initialization, mark equal members as online before we
// initialize. We will still report these members as equal, but if we crash and recover they
// will no longer be considered equal.
equalMembers = new CopyOnWriteHashSet<>(persistentMemberView.getOfflineAndEqualMembers());
for (PersistentMemberID id : equalMembers) {
persistentMemberView.memberOnline(id);
}
}
@Override
public void initialize() {
if (initialized) {
return;
}
if (wasAboutToDestroy()) {
logger.info("Region {} crashed during a region destroy. Finishing the destroy.",
regionPath);
finishPendingDestroy();
}
cacheDistributionAdvisor.addProfileChangeListener(profileChangeListener);
Set<PersistentMemberPattern> revokedMembers = persistentMemberManager
.addRevocationListener(profileChangeListener, persistentMemberView.getRevokedMembers());
for (PersistentMemberPattern pattern : revokedMembers) {
memberRevoked(pattern);
}
// Start logging changes to the persistent view
startMemberLogging();
initialized = true;
}
/**
* Adds a PersistentStateListener whose job is to log changes in the persistent view.
*/
private void startMemberLogging() {
addListener(new PersistentStateListener.PersistentStateAdapter() {
/**
* A persistent member has gone offline. Log the offline member and log which persistent
* members are still online (the current persistent view).
*/
@Override
public void memberOffline(InternalDistributedMember member, PersistentMemberID persistentID) {
if (logger.isDebugEnabled()) {
Set<PersistentMemberID> members =
new HashSet<>(cacheDistributionAdvisor.adviseInitializedPersistentMembers().values());
members.remove(persistentID);
Set<String> onlineMembers = new HashSet<>();
TransformUtils.transform(members, onlineMembers,
TransformUtils.persistentMemberIdToLogEntryTransformer);
logger.info(
"The following persistent member has gone offline for region {}: {} Remaining participating members for the region include: {}",
new Object[] {regionPath,
TransformUtils.persistentMemberIdToLogEntryTransformer
.transform(persistentID),
onlineMembers});
}
}
});
}
@Override
public PersistentStateQueryResults getMyStateOnMembers(Set<InternalDistributedMember> members)
throws ReplyException {
return fetchPersistentStateQueryResults(members,
cacheDistributionAdvisor.getDistributionManager(), persistentMemberView.getMyPersistentID(),
persistentMemberView.getMyInitializingID());
}
private PersistentStateQueryResults fetchPersistentStateQueryResults(
Set<InternalDistributedMember> members, DistributionManager dm,
PersistentMemberID persistentMemberID, PersistentMemberID initializingMemberId) {
PersistentStateQueryReplyProcessor replyProcessor = persistentStateQueryMessageSenderFactory
.createPersistentStateQueryReplyProcessor(dm, members);
PersistentStateQueryMessage message =
persistentStateQueryMessageSenderFactory.createPersistentStateQueryMessage(regionPath,
persistentMemberID, initializingMemberId, replyProcessor.getProcessorId());
return message.send(members, dm, replyProcessor);
}
/**
* Return what state we have persisted for a given peer's id.
*/
@Override
public PersistentMemberState getPersistedStateOfMember(PersistentMemberID id) {
if (isRevoked(id)) {
return PersistentMemberState.REVOKED;
}
// If the peer is marked as equal, indicate they are equal
if (equalMembers != null && equalMembers.contains(id)) {
return PersistentMemberState.EQUAL;
}
// If we have a member that is marked as online that is an older version of the peers id, tell
// them they are online
for (PersistentMemberID onlineMember : persistentMemberView.getOnlineMembers()) {
if (onlineMember.isOlderOrEqualVersionOf(id)) {
return PersistentMemberState.ONLINE;
}
}
// If we have a member that is marked as offline that is a newer version of the peers id, tell
// them they are online
for (PersistentMemberID offline : persistentMemberView.getOfflineMembers()) {
if (id.isOlderOrEqualVersionOf(offline)) {
return PersistentMemberState.OFFLINE;
}
}
return null;
}
@Override
public void updateMembershipView(InternalDistributedMember peer, boolean targetReinitializing) {
beginUpdatingPersistentView();
DistributionManager dm = cacheDistributionAdvisor.getDistributionManager();
PersistentMembershipView peersPersistentMembershipView =
MembershipViewRequest.send(peer, dm, regionPath, targetReinitializing);
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Updating persistent view from {}",
shortDiskStoreId(), regionPath, peer);
}
synchronized (lock) {
PersistentMemberID myId = getPersistentID();
Map<InternalDistributedMember, PersistentMemberID> peersOnlineMembers =
peersPersistentMembershipView.getOnlineMembers();
Set<PersistentMemberID> peersOfflineMembers =
peersPersistentMembershipView.getOfflineMembers();
for (PersistentMemberID id : peersOnlineMembers.values()) {
if (!isRevoked(id) && !removedMembers.contains(id)) {
if (!id.equals(myId) && !recoveredMembers.remove(id)
&& !id.getDiskStoreId().equals(getDiskStoreID())) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Processing membership view from peer. Marking {} as online because {} says its online",
shortDiskStoreId(), regionPath, id, peer);
}
persistentMemberView.memberOnline(id);
}
}
}
for (PersistentMemberID id : peersOfflineMembers) {
if (!isRevoked(id) && !removedMembers.contains(id)) {
// This method is called before the current member is online. if the peer knows about a
// member that the current member doesn't know about, that means that member must have
// been added to the DS after the current member went offline. Therefore, that member is
// *newer* than the current member. So mark that member as online (meaning, online later
// than the current member).
if (!id.equals(myId) && !recoveredMembers.remove(id)
&& !id.getDiskStoreId().equals(getDiskStoreID())) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Processing membership view from peer. Marking {} as online because {} says its offline, but we have never seen it",
shortDiskStoreId(), regionPath, id, peer);
}
persistentMemberView.memberOnline(id);
}
}
}
for (PersistentMemberID id : recoveredMembers) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Processing membership view from peer. Removing {} because {} doesn't have it",
shortDiskStoreId(), regionPath, id, peer);
}
persistentMemberView.memberRemoved(id);
}
}
// Update the set of revoked members from the peer. This should be called without holding the
// lock to avoid deadlocks
Set<PersistentMemberPattern> revokedMembers = peersPersistentMembershipView.getRevokedMembers();
for (PersistentMemberPattern revoked : revokedMembers) {
persistentMemberManager.revokeMember(revoked);
}
}
private boolean isRevoked(PersistentMemberID id) {
return persistentMemberManager.isRevoked(regionPath, id);
}
@Override
public void setOnline(boolean didGII, boolean atomicCreation, PersistentMemberID newId)
throws ReplyException {
if (online) {
return;
}
if (!didGII) {
setInitializing(newId);
}
synchronized (lock) {
// Transition any members that are marked as online, but not actually currently running, to
// offline.
Set<PersistentMemberID> membersToMarkOffline =
new HashSet<>(persistentMemberView.getOnlineMembers());
Map<InternalDistributedMember, PersistentMemberID> onlineMembers;
if (!atomicCreation) {
onlineMembers = cacheDistributionAdvisor.adviseInitializedPersistentMembers();
} else {
// Fix for 41100 - If this is an atomic bucket creation, don't mark our peers, which are
// concurrently initializing, as offline they have the exact same data as we do (none), so
// we are not technically "newer," and this avoids a race where both members can think the
// other is offline ("older").
onlineMembers = cacheDistributionAdvisor.advisePersistentMembers();
}
membersToMarkOffline.removeAll(onlineMembers.values());
// Another fix for 41100 - Don't mark equal members as offline if that are currently running.
// We don't have newer data than these members so this is safe, and it it avoids a race where
// we mark them offline at this point, and then later they mark us as offline.
if (equalMembers != null && !equalMembers.isEmpty()) {
// This is slightly hacky. We're looking for a running member that has the same disk store
// as our equal members, because all have is a persistent id of the equal members. The
// persistent id of the running member may be different than what we have marked as equal,
// because the id in the profile is the new id for the member.
Collection<PersistentMemberID> allMembers =
cacheDistributionAdvisor.advisePersistentMembers().values();
Set<DiskStoreID> runningDiskStores = new HashSet<>();
for (PersistentMemberID mem : allMembers) {
runningDiskStores.add(mem.getDiskStoreId());
}
// Remove any equal members which are not actually running right now.
for (PersistentMemberID id : equalMembers) {
if (!runningDiskStores.contains(id.getDiskStoreId())) {
equalMembers.remove(id);
}
}
membersToMarkOffline.removeAll(equalMembers);
}
for (PersistentMemberID id : membersToMarkOffline) {
persistentMemberView.memberOffline(id);
}
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Persisting the new membership view and ID as online. Online members {}. Offline members {}. Equal memebers {}.",
shortDiskStoreId(), regionPath, persistentMemberView.getOnlineMembers(),
persistentMemberView.getOfflineMembers(), equalMembers);
}
persistentMemberView.setInitialized();
online = true;
removedMembers.clear();
}
if (diskRegionStats != null) {
diskRegionStats.incInitializations(!didGII);
}
}
/**
* Start listening for persistent view updates and apply any updates that have already happened.
*
* This method should be called after we have decided that there is no conflicting persistent
* exception.
*
* Fix for bug 44045.
*/
protected void beginUpdatingPersistentView() {
synchronized (lock) {
// Only update the view if it is has not already happened.
if (!shouldUpdatePersistentView) {
shouldUpdatePersistentView = true;
Map<InternalDistributedMember, PersistentMemberID> onlineMembers =
cacheDistributionAdvisor.adviseInitializedPersistentMembers();
for (Map.Entry<InternalDistributedMember, PersistentMemberID> entry : onlineMembers
.entrySet()) {
memberOnline(entry.getKey(), entry.getValue());
}
}
}
}
@Override
public void setInitializing(PersistentMemberID newId) {
beginUpdatingPersistentView();
DistributionManager dm = cacheDistributionAdvisor.getDistributionManager();
PersistentMemberID oldId = getPersistentID();
PersistentMemberID initializingId = getInitializingID();
Set<InternalDistributedMember> profileUpdateRecipients =
cacheDistributionAdvisor.adviseProfileUpdate();
if (newId == null || !newId.equals(oldId) && !newId.equals(initializingId)) {
// If we have not yet prepared the old id, prepare it now.
// This will only be the case if we crashed while initializing previously. In the case, we are
// essentially finishing what we started by preparing that ID first. This will remove that ID
// from the peers.
if (initializingId != null) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: We still have an initializing id: {}. Telling peers to remove the old id {} and transitioning this initializing id to old id. recipients {}",
shortDiskStoreId(), regionPath, initializingId, oldId, profileUpdateRecipients);
}
long viewVersion = cacheDistributionAdvisor.startOperation();
try {
PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, regionPath, oldId,
initializingId);
} finally {
if (viewVersion != -1) {
cacheDistributionAdvisor.endOperation(viewVersion);
}
}
oldId = initializingId;
}
if (logger.isDebugEnabled()) {
logger.debug("Persisting my new persistent ID {}", newId);
}
persistentMemberView.setInitializing(newId);
}
profileUpdateRecipients = cacheDistributionAdvisor.adviseProfileUpdate();
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Sending the new ID to peers. They should remove the old id {}. Recipients: {}",
shortDiskStoreId(), regionPath, oldId, profileUpdateRecipients);
}
if (newId != null) {
PrepareNewPersistentMemberMessage.send(profileUpdateRecipients, dm, regionPath, oldId, newId);
}
}
@Override
public PersistentMemberID generatePersistentID() {
return persistentMemberView.generatePersistentID();
}
@Override
public PersistentMembershipView getMembershipView() {
if (!initialized) {
return null;
}
Set<PersistentMemberID> offlineMembers = getPersistedMembers();
Map<InternalDistributedMember, PersistentMemberID> onlineMembers =
cacheDistributionAdvisor.adviseInitializedPersistentMembers();
offlineMembers.removeAll(onlineMembers.values());
PersistentMemberID myId = getPersistentID();
if (myId != null) {
onlineMembers
.put(cacheDistributionAdvisor.getDistributionManager().getDistributionManagerId(), myId);
}
return new PersistentMembershipView(offlineMembers, onlineMembers,
persistentMemberManager.getRevokedMembers());
}
@Override
public Set<PersistentMemberID> getPersistedMembers() {
Set<PersistentMemberID> persistentMembers = new HashSet<>();
persistentMembers.addAll(persistentMemberView.getOfflineMembers());
persistentMembers.addAll(persistentMemberView.getOfflineAndEqualMembers());
persistentMembers.addAll(persistentMemberView.getOnlineMembers());
return persistentMembers;
}
@Override
public boolean checkMyStateOnMembers(Set<InternalDistributedMember> replicates)
throws ReplyException {
PersistentStateQueryResults remoteStates = getMyStateOnMembers(replicates);
persistenceAdvisorObserver.observe(regionPath);
boolean equal = false;
for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : remoteStates
.getStateOnPeers()
.entrySet()) {
InternalDistributedMember member = entry.getKey();
PersistentMemberID remoteId = remoteStates.getPersistentIds().get(member);
final PersistentMemberID myId = getPersistentID();
PersistentMemberState stateOnPeer = entry.getValue();
if (PersistentMemberState.REVOKED.equals(stateOnPeer)) {
throw new RevokedPersistentDataException(
String.format(
"The persistent member id %s has been revoked in this distributed system. You cannot recover from disk files which have been revoked.",
myId));
}
if (myId != null && stateOnPeer == null) {
String message = String.format(
"Region %s remote member %s with persistent data %s was not part of the same distributed system as the local data from %s",
regionPath, member, remoteId, myId);
throw new ConflictingPersistentDataException(message);
}
if (myId != null && stateOnPeer == PersistentMemberState.EQUAL) {
equal = true;
}
// The other member changes its ID when it comes back online.
if (remoteId != null) {
PersistentMemberState remoteState = getPersistedStateOfMember(remoteId);
if (remoteState == PersistentMemberState.OFFLINE) {
String message =
String.format(
"Region %s refusing to initialize from member %s with persistent data %s which was offline when the local data from %s was last online",
regionPath, member, remoteId, myId);
throw new ConflictingPersistentDataException(message);
}
}
}
return equal;
}
public static void setPersistenceAdvisorObserver(PersistenceAdvisorObserver o) {
persistenceAdvisorObserver = o == null ? DEFAULT_PERSISTENCE_ADVISOR_OBSERVER : o;
}
@Override
public PersistentMemberID getPersistentIDIfOnline() {
if (online) {
return persistentMemberView.getMyPersistentID();
} else {
return null;
}
}
private void memberOffline(InternalDistributedMember distributedMember,
PersistentMemberID persistentID) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Member offine. id={}, persistentID={}", shortDiskStoreId(), regionPath,
distributedMember, persistentID);
}
synchronized (lock) {
boolean foundMember = recoveredMembers.remove(persistentID);
foundMember |= equalMembers.remove(persistentID);
foundMember |= getPersistedMembers().contains(persistentID);
// Don't persist members as offline until we are online. Otherwise, we may think we have later
// data than them during recovery.
if (shouldUpdatePersistentView && online) {
try {
// Don't persistent members as offline if we have already persisted them as equal.
if (persistentMemberView.getOfflineAndEqualMembers().contains(persistentID)) {
return;
}
// Don't mark the member as offline if we have never seen it. If we haven't seen it that
// means it's not done initializing yet.
if (foundMember) {
if (PersistenceObserverHolder.getInstance().memberOffline(regionPath, persistentID)) {
persistentMemberView.memberOffline(persistentID);
}
PersistenceObserverHolder.getInstance().afterPersistedOffline(regionPath, persistentID);
}
} catch (DiskAccessException e) {
logger.warn("Unable to persist membership change", e);
}
}
notifyListenersMemberOffline(distributedMember, persistentID);
}
}
private void memberOnline(InternalDistributedMember distributedMember,
PersistentMemberID persistentID) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Sending the new ID to peers. Member online. id={}, persistentID={}",
shortDiskStoreId(), regionPath, distributedMember, persistentID);
}
synchronized (lock) {
if (shouldUpdatePersistentView) {
recoveredMembers.remove(persistentID);
try {
if (PersistenceObserverHolder.getInstance().memberOnline(regionPath, persistentID)) {
persistentMemberView.memberOnline(persistentID);
}
PersistenceObserverHolder.getInstance().afterPersistedOnline(regionPath, persistentID);
} catch (DiskAccessException e) {
logger.warn("Unable to persist membership change", e);
}
} else {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Not marking member online in persistent view because we're still in initialization",
shortDiskStoreId(), regionPath);
}
}
notifyListenersMemberOnline(distributedMember, persistentID);
}
}
private void memberRevoked(PersistentMemberPattern pattern) {
// Persist the revoked member, so if we recover later we will remember that they were revoked.
persistentMemberView.memberRevoked(pattern);
// Remove the revoked member from our view.
for (PersistentMemberID id : persistentMemberView.getOfflineMembers()) {
if (pattern.matches(id)) {
memberRemoved(id, true);
}
}
for (PersistentMemberID id : persistentMemberView.getOnlineMembers()) {
if (pattern.matches(id)) {
memberRemoved(id, true);
}
}
for (PersistentMemberID id : persistentMemberView.getOfflineAndEqualMembers()) {
if (pattern.matches(id)) {
memberRemoved(id, true);
}
}
}
void memberRemoved(PersistentMemberID id, boolean revoked) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Member removed. persistentID={}",
shortDiskStoreId(), regionPath, id);
}
synchronized (lock) {
recoveredMembers.remove(id);
equalMembers.remove(id);
if (!online) {
removedMembers.add(id);
}
try {
if (PersistenceObserverHolder.getInstance().memberRemoved(regionPath, id)) {
persistentMemberView.memberRemoved(id);
}
// Purge any IDs that are old versions of the the id that we just removed
for (PersistentMemberID persistedId : getPersistedMembers()) {
if (persistedId.isOlderOrEqualVersionOf(id)) {
persistentMemberView.memberRemoved(persistedId);
}
}
PersistenceObserverHolder.getInstance().afterRemovePersisted(regionPath, id);
} catch (DiskAccessException e) {
logger.warn("Unable to persist membership change", e);
}
notifyListenersMemberRemoved(id, revoked);
}
}
@Override
public PersistentMemberID getPersistentID() {
return persistentMemberView.getMyPersistentID();
}
@Override
public PersistentMemberID getInitializingID() {
return persistentMemberView.getMyInitializingID();
}
@Override
public void addListener(PersistentStateListener listener) {
synchronized (this) {
Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners);
tmpListeners.add(listener);
persistentStateListeners = Collections.unmodifiableSet(tmpListeners);
}
}
@Override
public void removeListener(PersistentStateListener listener) {
synchronized (this) {
Set<PersistentStateListener> tmpListeners = new HashSet<>(persistentStateListeners);
tmpListeners.remove(listener);
persistentStateListeners = Collections.unmodifiableSet(tmpListeners);
}
}
private void notifyListenersMemberOnline(InternalDistributedMember member,
PersistentMemberID persistentID) {
for (PersistentStateListener listener : persistentStateListeners) {
listener.memberOnline(member, persistentID);
}
}
private void notifyListenersMemberOffline(InternalDistributedMember member,
PersistentMemberID persistentID) {
for (PersistentStateListener listener : persistentStateListeners) {
listener.memberOffline(member, persistentID);
}
}
private void notifyListenersMemberRemoved(PersistentMemberID persistentID, boolean revoked) {
for (PersistentStateListener listener : persistentStateListeners) {
listener.memberRemoved(persistentID, revoked);
}
}
@Override
public HashSet<PersistentMemberID> getPersistedOnlineOrEqualMembers() {
HashSet<PersistentMemberID> members = new HashSet<>(persistentMemberView.getOnlineMembers());
members.addAll(equalMembers);
return members;
}
@Override
public void prepareNewMember(InternalDistributedMember sender, PersistentMemberID oldId,
PersistentMemberID newId) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Preparing new persistent id {}. Old id is {}", shortDiskStoreId(), regionPath,
newId, oldId);
}
synchronized (lock) {
// Don't prepare the ID if the advisor doesn't have a profile. This prevents a race with the
// advisor remove
if (!cacheDistributionAdvisor.containsId(sender)) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Refusing to prepare id because {} is not in our advisor", shortDiskStoreId(),
regionPath, sender);
}
return;
}
// Persist new members even if we are not online yet. Two members can become online at once.
// This way, they will know about each other.
persistentMemberView.memberOnline(newId);
// The oldId and newId could be the same if the member is retrying a GII. See bug #42051
if (oldId != null && !oldId.equals(newId)) {
memberRemoved(oldId, false);
}
}
}
protected String shortDiskStoreId() {
DiskStoreID diskStoreID = getDiskStoreID();
return diskStoreID == null ? "mem" : diskStoreID.abbrev();
}
@Override
public void removeMember(PersistentMemberID id) {
memberRemoved(id, false);
}
@Override
public void markMemberOffline(InternalDistributedMember member, PersistentMemberID id) {
memberOffline(member, id);
}
@Override
public CacheDistributionAdvisor getCacheDistributionAdvisor() {
return cacheDistributionAdvisor;
}
@Override
public void setWaitingOnMembers(Set<PersistentMemberID> allMembersToWaitFor,
Set<PersistentMemberID> offlineMembersToWaitFor) {
allMembersWaitingFor = allMembersToWaitFor;
offlineMembersWaitingFor = offlineMembersToWaitFor;
}
@Override
public boolean isClosed() {
return isClosed;
}
public void finishPendingDestroy() {
// send a message to peers indicating that they should remove this profile
long viewVersion = cacheDistributionAdvisor.startOperation();
try {
RemovePersistentMemberMessage.send(cacheDistributionAdvisor.adviseProfileUpdate(),
cacheDistributionAdvisor.getDistributionManager(), regionPath, getPersistentID(),
getInitializingID());
persistentMemberView.finishPendingDestroy();
} finally {
if (viewVersion != -1) {
cacheDistributionAdvisor.endOperation(viewVersion);
}
}
synchronized (lock) {
recoveredMembers.clear();
recoveredMembers.addAll(getPersistedMembers());
}
}
/**
* Returns the member id of the member who has the latest copy of the persistent region. This may
* be the local member ID if this member has the latest known copy.
*
* This method will block until the latest member is online.
*
* @throws ConflictingPersistentDataException if there are active members which are not based on
* the state that is persisted in this member.
*/
@Override
public InitialImageAdvice getInitialImageAdvice(InitialImageAdvice previousAdvice,
boolean hasDiskImageToRecoverFrom) {
PersistenceInitialImageAdvisor piia = new PersistenceInitialImageAdvisor(this,
shortDiskStoreId(), regionPath, cacheDistributionAdvisor, hasDiskImageToRecoverFrom);
return piia.getAdvice(previousAdvice);
}
/**
* @param previouslyOnlineMembers the members we have persisted online in our persistence files
* @param offlineMembers This method will populate this set with any members that we are waiting
* for which are actually not running right now. This is different from the set of members
* we need to wait for - this member may end up waiting on a member that is actually
* running.
* @return the list of members that this member needs to wait for before it can initialize.
*/
@Override
public Set<PersistentMemberID> getMembersToWaitFor(
Set<PersistentMemberID> previouslyOnlineMembers, Set<PersistentMemberID> offlineMembers)
throws ReplyException {
PersistentMemberID myPersistentID = getPersistentID();
PersistentMemberID myInitializingId = getInitializingID();
// This is the set of members that are currently waiting for this member
// to come online.
Set<PersistentMemberID> membersToWaitFor = new HashSet<>(previouslyOnlineMembers);
offlineMembers.addAll(previouslyOnlineMembers);
// If our persistent ID is null, we need to wait for all of the previously online members.
if (myPersistentID != null || myInitializingId != null) {
Set<InternalDistributedMember> members = cacheDistributionAdvisor.adviseProfileUpdate();
Set<InternalDistributedMember> membersHostingThisRegion =
cacheDistributionAdvisor.adviseGeneric();
// Fetch the persistent view from all of our peers.
PersistentStateQueryResults results = fetchPersistentStateQueryResults(members,
cacheDistributionAdvisor.getDistributionManager(), myPersistentID, myInitializingId);
// iterate through all of the peers. For each peer: if the member was previously online
// according
// to us, grab its online members and add them to the members to wait for set. We may need to
// do this several times until we discover all of the members that may have newer data than
// us.
boolean addedMembers = true;
while (addedMembers) {
addedMembers = false;
for (Entry<InternalDistributedMember, Set<PersistentMemberID>> entry : results
.getOnlineMemberMap()
.entrySet()) {
InternalDistributedMember memberId = entry.getKey();
Set<PersistentMemberID> peersOnlineMembers = entry.getValue();
PersistentMemberID persistentID = results.getPersistentIds().get(memberId);
PersistentMemberID initializingID = results.getInitializingIds().get(memberId);
if (membersToWaitFor.contains(persistentID)
|| membersToWaitFor.contains(initializingID)) {
for (PersistentMemberID peerOnlineMember : peersOnlineMembers) {
if (!isRevoked(peerOnlineMember)
&& !peerOnlineMember.getDiskStoreId().equals(getDiskStoreID())
&& !persistentMemberView.getOfflineMembers().contains(peerOnlineMember)) {
if (membersToWaitFor.add(peerOnlineMember)) {
addedMembers = true;
// Make sure we also persist that this member is online.
persistentMemberView.memberOnline(peerOnlineMember);
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Adding {} to the list of members we're wait for, because {} has newer or equal data than is and is waiting for that member",
shortDiskStoreId(), regionPath, peerOnlineMember, memberId);
}
}
}
}
}
}
}
removeOlderMembers(membersToWaitFor);
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Initial state of membersToWaitFor, before pruning {}", shortDiskStoreId(),
regionPath, membersToWaitFor);
}
// For each of our peers, see what our state is according to their view.
for (Map.Entry<InternalDistributedMember, PersistentMemberState> entry : results
.getStateOnPeers().entrySet()) {
InternalDistributedMember memberId = entry.getKey();
PersistentMemberID persistentID = results.getPersistentIds().get(memberId);
PersistentMemberID initializingID = results.getInitializingIds().get(memberId);
DiskStoreID diskStoreID = results.getDiskStoreIds().get(memberId);
PersistentMemberState state = entry.getValue();
if (PersistentMemberState.REVOKED.equals(state)) {
throw new RevokedPersistentDataException(
String.format(
"The persistent member id %s has been revoked in this distributed system. You cannot recover from disk files which have been revoked.",
myPersistentID));
}
// If the peer thinks we are newer or equal to them, we don't need to wait for this peer.
if (membersHostingThisRegion.contains(memberId) && persistentID != null && state != null
&& myInitializingId == null && (state.equals(PersistentMemberState.ONLINE)
|| state.equals(PersistentMemberState.EQUAL))) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Not waiting for {} because it thinks our state was {}", shortDiskStoreId(),
regionPath, persistentID, state);
}
removeNewerPersistentID(membersToWaitFor, persistentID);
}
// If the peer has an initialized ID, they are no longer offline.
if (persistentID != null) {
removeNewerPersistentID(offlineMembers, persistentID);
}
// If the peer thinks we are newer or equal to them, we don't need to wait for this peer.
if (membersHostingThisRegion.contains(memberId) && initializingID != null && state != null
&& (state.equals(PersistentMemberState.ONLINE)
|| state.equals(PersistentMemberState.EQUAL))) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Not waiting for {} because it thinks our state was {}", shortDiskStoreId(),
regionPath, initializingID, state);
}
removeByDiskStoreID(membersToWaitFor, diskStoreID, false);
}
// If the peer has an initializing id, they are also not online.
if (initializingID != null) {
removeNewerPersistentID(offlineMembers, initializingID);
}
handlePartiallyDestroyedRegion(offlineMembers, membersToWaitFor, persistentID,
initializingID,
diskStoreID);
}
}
return membersToWaitFor;
}
/**
* In the event that the region was partially destroyed via DestroyDataStorage on the peer,
* we do not need to wait on that peer. Currently this state can be reached when a bucket region
* GII fails, which results in DestroyDataStorage on the region (as opposed to a DestroyRegion).
* See DiskRegion.destroyPartiallyInitializedRegion() which handles the failed GII on the image
* receiving side for more details.
*/
private void handlePartiallyDestroyedRegion(final Set<PersistentMemberID> offlineMembers,
final Set<PersistentMemberID> membersToWaitFor,
final PersistentMemberID persistentID,
final PersistentMemberID initializingID,
final DiskStoreID diskStoreID) {
/*
* When DestroyDataStorage is invoked on a peer for this region, we expect that its
* initializing and persistent IDs will be null, but the disk store ID will be non-null
* because the region was not fully destroyed.
*/
if (initializingID == null && persistentID == null & diskStoreID != null) {
removeByDiskStoreID(membersToWaitFor, diskStoreID, true);
removeByDiskStoreID(offlineMembers, diskStoreID, true);
}
}
/**
* Given a set of persistent members, if the same member occurs more than once in the set but
* with different timestamps, remove the older ones leaving only the most recent.
*
* @param persistentMemberSet The set of persistent members, possibly modified by this method.
*/
protected void removeOlderMembers(Set<PersistentMemberID> persistentMemberSet) {
Map<DiskStoreID, PersistentMemberID> mostRecentMap = new HashMap<>();
List<PersistentMemberID> idsToRemove = new ArrayList<>();
for (PersistentMemberID persistentMember : persistentMemberSet) {
DiskStoreID diskStoreId = persistentMember.getDiskStoreId();
PersistentMemberID mostRecent = mostRecentMap.get(diskStoreId);
if (mostRecent == null) {
mostRecentMap.put(diskStoreId, persistentMember);
} else {
PersistentMemberID older = persistentMember;
boolean persistentMemberIsNewer =
!persistentMember.isOlderOrEqualVersionOf(mostRecent);
if (persistentMemberIsNewer) {
older = mostRecent;
mostRecentMap.put(diskStoreId, persistentMember);
}
idsToRemove.add(older);
}
}
persistentMemberSet.removeAll(idsToRemove);
}
/**
* Remove all members with a given disk store id from the set of members to wait for, who is newer
* than the real one. The reason is: A is waiting for B2, but B sends B1<=A to A. That means A
* knows more than B in both B1 and B2. B itself knows nothing about B2. So we don't need to wait
* for B2 (since we don't need to wait for B1).
*/
private void removeNewerPersistentID(Set<PersistentMemberID> membersToWaitFor,
PersistentMemberID persistentID) {
for (Iterator<PersistentMemberID> itr = membersToWaitFor.iterator(); itr.hasNext();) {
PersistentMemberID id = itr.next();
if (persistentID.isOlderOrEqualVersionOf(id)) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Not waiting for {} because local member knows more about it",
shortDiskStoreId(), regionPath, id);
}
itr.remove();
}
}
}
/**
* Remove all members with a given disk store id from the set of members to wait for.
*/
private void removeByDiskStoreID(Set<PersistentMemberID> membersToWaitFor,
DiskStoreID diskStoreID, boolean updateAdvisor) {
for (Iterator<PersistentMemberID> itr = membersToWaitFor.iterator(); itr.hasNext();) {
PersistentMemberID id = itr.next();
if (id.getDiskStoreId().equals(diskStoreID)) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE,
"{}-{}: Not waiting for {} because it no longer has this region in its disk store",
shortDiskStoreId(), regionPath, id);
}
itr.remove();
if (updateAdvisor) {
memberRemoved(id, false);
}
}
}
}
@Override
public boolean wasHosting() {
return getPersistentID() != null || getInitializingID() != null;
}
protected String getRegionPathForOfflineMembers() {
return regionPath;
}
/**
* Returns the set of missing members that we report back to the any admin DS looking for missing
* members.
*/
protected Set<PersistentMemberID> getMissingMembers() {
return offlineMembersWaitingFor;
}
/**
* Returns the set of missing members that we report back to the any admin DS looking for missing
* members.
*/
public Set<PersistentMemberID> getAllMembersToWaitFor() {
return allMembersWaitingFor;
}
@Override
public void logWaitingForMembers() {
Set<String> membersToWaitForLogEntries = new HashSet<>();
if (offlineMembersWaitingFor != null && !offlineMembersWaitingFor.isEmpty()) {
TransformUtils.transform(offlineMembersWaitingFor, membersToWaitForLogEntries,
TransformUtils.persistentMemberIdToLogEntryTransformer);
StartupStatus.startup(
String.format(
"Region %s has potentially stale data. It is waiting for another member to recover the latest data.My persistent id:%sMembers with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.",
regionPath,
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
membersToWaitForLogEntries));
} else {
TransformUtils.transform(allMembersWaitingFor, membersToWaitForLogEntries,
TransformUtils.persistentMemberIdToLogEntryTransformer);
StartupStatus.startup(
String.format(
"Region %s has potentially stale data. It is waiting for another online member to recover the latest data.My persistent id:%sMembers with potentially new data:%sUse the gfsh show missing-disk-stores command to see all disk stores that are being waited on by other members.",
regionPath,
TransformUtils.persistentMemberIdToLogEntryTransformer.transform(getPersistentID()),
membersToWaitForLogEntries));
}
}
@Override
public void clearEqualMembers() {
synchronized (lock) {
equalMembers.clear();
}
}
@Override
public void checkInterruptedByShutdownAll() {}
@Override
public void close() {
isClosed = true;
persistentMemberManager.removeRevocationListener(profileChangeListener);
cacheDistributionAdvisor.removeProfileChangeListener(profileChangeListener);
releaseTieLock();
}
/**
* Try to acquire the distributed lock which members must grab for in the case of a tie. Whoever
* gets the lock initializes first.
*/
@Override
public boolean acquireTieLock() {
// We're tied for the latest copy of the data. try to get the distributed lock.
holdingTieLock = distributedLockService.lock("PERSISTENCE_" + regionPath, 0, -1);
if (!holdingTieLock) {
if (logger.isDebugEnabled(LogMarker.PERSIST_ADVISOR_VERBOSE)) {
logger.debug(LogMarker.PERSIST_ADVISOR_VERBOSE, "{}-{}: Failed to acquire the lock.",
shortDiskStoreId(), regionPath);
}
}
return holdingTieLock;
}
@Override
public void releaseTieLock() {
if (holdingTieLock) {
distributedLockService.unlock("PERSISTENCE_" + regionPath);
holdingTieLock = false;
}
}
private boolean wasAboutToDestroy() {
return persistentMemberView.wasAboutToDestroy()
|| persistentMemberView.wasAboutToDestroyDataStorage();
}
protected synchronized void resetState() {
online = false;
removedMembers.clear();
}
public void flushMembershipChanges() {
try {
cacheDistributionAdvisor.waitForCurrentOperations();
} catch (RegionDestroyedException ignored) {
}
}
@Override
public void persistMembersOfflineAndEqual(
Map<InternalDistributedMember, PersistentMemberID> map) {
for (PersistentMemberID persistentID : map.values()) {
persistentMemberView.memberOfflineAndEqual(persistentID);
}
}
@Override
public DiskStoreID getDiskStoreID() {
return persistentMemberView.getDiskStoreID();
}
@Override
public boolean isOnline() {
return online;
}
public interface PersistenceAdvisorObserver {
void observe(String regionPath);
}
private class ProfileChangeListener implements ProfileListener, MemberRevocationListener {
@Override
public void profileCreated(Profile profile) {
profileUpdated(profile);
}
@Override
public void profileRemoved(Profile profile, boolean destroyed) {
CacheProfile cp = (CacheProfile) profile;
if (cp.persistentID != null) {
if (destroyed) {
memberRemoved(cp.persistentID, false);
} else {
memberOffline(profile.getDistributedMember(), cp.persistentID);
}
}
}
@Override
public void profileUpdated(Profile profile) {
CacheProfile cp = (CacheProfile) profile;
if (cp.persistentID != null && cp.persistenceInitialized) {
memberOnline(profile.getDistributedMember(), cp.persistentID);
}
}
@Override
public void revoked(PersistentMemberPattern pattern) {
memberRevoked(pattern);
}
@Override
public Set<PersistentMemberID> getMissingMemberIds() {
return getMissingMembers();
}
@Override
public String getRegionPath() {
return getRegionPathForOfflineMembers();
}
@Override
public boolean matches(PersistentMemberPattern pattern) {
return pattern.matches(getPersistentID()) || pattern.matches(getInitializingID());
}
}
}